HadoopDataTransport Hadoop数据移动方法

前段时间旧HADOOP升级内存,需要把某些共用数据文件迁移到临时的HADOOP上,如果用hadoop 命令效率N低,
于是就写一个小程序,按照文件列表,把数据迁移到新HADOOP上,命令:

java -classpath ./lib/hadoop-core-1.0.2.jar:./lib/commons-logging-1.1.1.jar:./lib/commons-configuration-1.6.jar:./lib/commons-lang-2.4.jar:./ test.HadoopDataTransport filelist.txt 100

[root@datanode4 ~]# cat filelist.txt

/201401/21/3bb30e5f-cf3e-4182-a7c0-ed486f80a87a.mp3
/201401/21/1d62fff3-744e-41c9-8152-5243ee0ce7b4.mp3
/201401/21/784a53f4-2125-4bc6-bf6a-0adf40981a64.mp3

代码清单:不喜欢写注释。

package test;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class HadoopDataTransport implements Runnable {

private static FileSystem src_fs;
private static FileSystem des_fs;
private static List<String> FailureList = new ArrayList<String>();

private static List<Boolean> jobs = new ArrayList<Boolean>();

static final String basedir = “/data/album”; // 保存在HADOOP中的根目录

public static void main(String[] args) throws Exception {
List<String> al = new ArrayList<String>();
System.out.println(“read list from file : ” + args[0]);
int pm = Integer.parseInt(args[1]);
FileReader fr = new FileReader(args[0]);
BufferedReader br = new BufferedReader(fr);
String line;
while ((line = br.readLine()) != null) {
if (line!=null && line.length()>10) {
al.add(line);
}
}
int pmax = pm;
if(al.size()>0){
for(int i=0;i<al.size();i=i+pmax){
int max = ((i+pmax)>al.size()?al.size():(i+pmax));
new Thread(new HadoopDataTransport(al.subList(i, max),i,0)).start();
jobs.add(true);
}
}
br.close();
fr.close();
while(jobs.size()>0){
Thread.sleep(500);
}
if(FailureList.size()>0){// save failure list
saveFailurelist(args[0]);
}
}

private List<String> filelist;
private int threadid;
public HadoopDataTransport(List<String> list,int id,int opts){
this.filelist = list;
this.threadid = id;
}

@Override
public void run() {
int success = 0;
int failure =0;

for(String file : filelist){
System.out.println(threadid + ” ==> processing …. “+file);
try {
InputStream in = readFromHadoopInputStream(file);
if(in !=null){
String filename = createHdfsFile(file, in);
System.out.println(threadid + ” ==> “+filename + ” …… done.”);
success++;
}
} catch (Exception e) {
AddFailure(file);
System.out.println(threadid + ” ==> “+file + ” …… failure.” + e.getMessage());
failure++;
}
}
System.out.println(“===============ThreadId: “+threadid+” Finished Total:” + filelist.size() +” Success : “+ success+” Failure :”+ failure+”==========”);
jobs.remove(true);
}

private static void AddFailure(String filename){
FailureList.add(filename);
}
private static void saveFailurelist(String failurefile) throws Exception{
System.out.println(“Save “+”failure_”+failurefile);
FileWriter w = new FileWriter(“failure_”+failurefile);
PrintWriter out = new PrintWriter(w);
for(String s : FailureList){
out.println(s);
}
out.close();
}

private static String createHdfsFile(String dst, InputStream in)
throws Exception {
FSDataOutputStream out =des_fs.create(
getPath(dst));
IOUtils.copyBytes(in, out, 256 * 1024);
out.close();
return dst;
}

private static InputStream readFromHadoopInputStream(String dst) throws Exception {
Path path = getPath(dst);
if (src_fs.exists(path)) {
FSDataInputStream is = src_fs.open(path);
return is;
} else {
throw new Exception(“the file is not found .”);
}
}

private static Path getPath(String dir) {
return new Path(basedir + dir);
}

static {
Configuration src_conf = new Configuration();
src_conf.set(“fs.default.name”, “hdfs://192.168.2.50:8020”); // conf.set(“fs.default.name”,
// “hdfs://namenode-backup-vip:8020”);
src_conf.set(“dfs.block.size”, “524288”);
src_conf.set(“dfs.replication”, “2”);
src_conf.set(“dfs.permissions”, “false”);
src_conf.set(“dfs.permissions.supergroup”, “resin”);
src_conf.set(“dfs.web.ugi”, “resin”);
try {
src_fs = FileSystem.get(src_conf);
} catch (IOException e) {
e.printStackTrace();
}
System.out
.println(“Initialize Hadoop Server src fs hdfs://192.168.2.50:8020”);

Configuration des_conf = new Configuration();
des_conf.set(“fs.default.name”, “hdfs://192.168.2.85:8020”); // conf.set(“fs.default.name”,
// “hdfs://namenode-backup-vip:8020”);
des_conf.set(“dfs.block.size”, “524288”);
des_conf.set(“dfs.replication”, “2”);
des_conf.set(“dfs.permissions”, “false”);
des_conf.set(“dfs.permissions.supergroup”, “resin”);
des_conf.set(“dfs.web.ugi”, “resin”);
try {
des_fs = FileSystem.get(des_conf);
} catch (IOException e) {
e.printStackTrace();
}
System.out
.println(“Initialize Hadoop Server des fs hdfs://192.168.2.85:8020”);
}

}

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.