Monthly Archives: August 2015

HadoopDataTransport Hadoop数据移动方法

前段时间旧HADOOP升级内存,需要把某些共用数据文件迁移到临时的HADOOP上,如果用hadoop 命令效率N低,
于是就写一个小程序,按照文件列表,把数据迁移到新HADOOP上,命令:

java -classpath ./lib/hadoop-core-1.0.2.jar:./lib/commons-logging-1.1.1.jar:./lib/commons-configuration-1.6.jar:./lib/commons-lang-2.4.jar:./ test.HadoopDataTransport filelist.txt 100

[root@datanode4 ~]# cat filelist.txt

/201401/21/3bb30e5f-cf3e-4182-a7c0-ed486f80a87a.mp3
/201401/21/1d62fff3-744e-41c9-8152-5243ee0ce7b4.mp3
/201401/21/784a53f4-2125-4bc6-bf6a-0adf40981a64.mp3

代码清单:不喜欢写注释。

package test;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class HadoopDataTransport implements Runnable {

private static FileSystem src_fs;
private static FileSystem des_fs;
private static List<String> FailureList = new ArrayList<String>();

private static List<Boolean> jobs = new ArrayList<Boolean>();

static final String basedir = “/data/album”; // 保存在HADOOP中的根目录

public static void main(String[] args) throws Exception {
List<String> al = new ArrayList<String>();
System.out.println(“read list from file : ” + args[0]);
int pm = Integer.parseInt(args[1]);
FileReader fr = new FileReader(args[0]);
BufferedReader br = new BufferedReader(fr);
String line;
while ((line = br.readLine()) != null) {
if (line!=null && line.length()>10) {
al.add(line);
}
}
int pmax = pm;
if(al.size()>0){
for(int i=0;i<al.size();i=i+pmax){
int max = ((i+pmax)>al.size()?al.size():(i+pmax));
new Thread(new HadoopDataTransport(al.subList(i, max),i,0)).start();
jobs.add(true);
}
}
br.close();
fr.close();
while(jobs.size()>0){
Thread.sleep(500);
}
if(FailureList.size()>0){// save failure list
saveFailurelist(args[0]);
}
}

private List<String> filelist;
private int threadid;
public HadoopDataTransport(List<String> list,int id,int opts){
this.filelist = list;
this.threadid = id;
}

@Override
public void run() {
int success = 0;
int failure =0;

for(String file : filelist){
System.out.println(threadid + ” ==> processing …. “+file);
try {
InputStream in = readFromHadoopInputStream(file);
if(in !=null){
String filename = createHdfsFile(file, in);
System.out.println(threadid + ” ==> “+filename + ” …… done.”);
success++;
}
} catch (Exception e) {
AddFailure(file);
System.out.println(threadid + ” ==> “+file + ” …… failure.” + e.getMessage());
failure++;
}
}
System.out.println(“===============ThreadId: “+threadid+” Finished Total:” + filelist.size() +” Success : “+ success+” Failure :”+ failure+”==========”);
jobs.remove(true);
}

private static void AddFailure(String filename){
FailureList.add(filename);
}
private static void saveFailurelist(String failurefile) throws Exception{
System.out.println(“Save “+”failure_”+failurefile);
FileWriter w = new FileWriter(“failure_”+failurefile);
PrintWriter out = new PrintWriter(w);
for(String s : FailureList){
out.println(s);
}
out.close();
}

private static String createHdfsFile(String dst, InputStream in)
throws Exception {
FSDataOutputStream out =des_fs.create(
getPath(dst));
IOUtils.copyBytes(in, out, 256 * 1024);
out.close();
return dst;
}

private static InputStream readFromHadoopInputStream(String dst) throws Exception {
Path path = getPath(dst);
if (src_fs.exists(path)) {
FSDataInputStream is = src_fs.open(path);
return is;
} else {
throw new Exception(“the file is not found .”);
}
}

private static Path getPath(String dir) {
return new Path(basedir + dir);
}

static {
Configuration src_conf = new Configuration();
src_conf.set(“fs.default.name”, “hdfs://192.168.2.50:8020”); // conf.set(“fs.default.name”,
// “hdfs://namenode-backup-vip:8020”);
src_conf.set(“dfs.block.size”, “524288”);
src_conf.set(“dfs.replication”, “2”);
src_conf.set(“dfs.permissions”, “false”);
src_conf.set(“dfs.permissions.supergroup”, “resin”);
src_conf.set(“dfs.web.ugi”, “resin”);
try {
src_fs = FileSystem.get(src_conf);
} catch (IOException e) {
e.printStackTrace();
}
System.out
.println(“Initialize Hadoop Server src fs hdfs://192.168.2.50:8020”);

Configuration des_conf = new Configuration();
des_conf.set(“fs.default.name”, “hdfs://192.168.2.85:8020”); // conf.set(“fs.default.name”,
// “hdfs://namenode-backup-vip:8020”);
des_conf.set(“dfs.block.size”, “524288”);
des_conf.set(“dfs.replication”, “2”);
des_conf.set(“dfs.permissions”, “false”);
des_conf.set(“dfs.permissions.supergroup”, “resin”);
des_conf.set(“dfs.web.ugi”, “resin”);
try {
des_fs = FileSystem.get(des_conf);
} catch (IOException e) {
e.printStackTrace();
}
System.out
.println(“Initialize Hadoop Server des fs hdfs://192.168.2.85:8020”);
}

}

upgrade tomcat6xx to tomcat7xx with 3 problem3

今天把tomcat从6.0.18升级到7.0.25,发现了两个问题

问题1

java.lang.ClassNotFoundException: org.apache.catalina.mbeans.ServerLifecycleListener

发现居然找不到这个类,然后把catatina.jar下载下来反编译一看mbenas这个文件夹居然是空的

解决办法

6.0.18以前,conf/server.xml里面的配置有这项

注释掉就可以了

问题2

严重: Begin event threw exception
java.lang.IllegalArgumentException: taglib definition not consistent with specification version

tomcat 6.0.18里面的web.xml里面的tab配置如下
http://java.sun.com/jstl/core
/WEB-INF/c.tld

tomcat 7.0.25里面web.xml的tag配置应该如下所示

http://java.sun.com/jstl/core
/WEB-INF/c.tld

 

问题2

Aug 11, 2015 10:41:11 AM org.apache.jasper.compiler.JDTCompiler$1 findType
SEVERE: Compilation error
org.eclipse.jdt.internal.compiler.classfmt.ClassFormatException

 

原来是JDK的版本问题,系统自是OpenJDK 1.8,

要改回OpenJDK 1.6

yum install java-1.6.0-openjdk