{"id":1313,"date":"2015-08-18T17:53:58","date_gmt":"2015-08-18T09:53:58","guid":{"rendered":"http:\/\/www.strongd.net\/?p=1313"},"modified":"2015-08-18T17:56:08","modified_gmt":"2015-08-18T09:56:08","slug":"hadoopdatatransport-hadoop%e6%95%b0%e6%8d%ae%e7%a7%bb%e5%8a%a8%e6%96%b9%e6%b3%95","status":"publish","type":"post","link":"https:\/\/www.strongd.net\/?p=1313","title":{"rendered":"HadoopDataTransport Hadoop\u6570\u636e\u79fb\u52a8\u65b9\u6cd5"},"content":{"rendered":"<p>\u524d\u6bb5\u65f6\u95f4\u65e7HADOOP\u5347\u7ea7\u5185\u5b58\uff0c\u9700\u8981\u628a\u67d0\u4e9b\u5171\u7528\u6570\u636e\u6587\u4ef6\u8fc1\u79fb\u5230\u4e34\u65f6\u7684HADOOP\u4e0a\uff0c\u5982\u679c\u7528hadoop \u547d\u4ee4\u6548\u7387N\u4f4e\uff0c<br \/>\n\u4e8e\u662f\u5c31\u5199\u4e00\u4e2a\u5c0f\u7a0b\u5e8f\uff0c\u6309\u7167\u6587\u4ef6\u5217\u8868\uff0c\u628a\u6570\u636e\u8fc1\u79fb\u5230\u65b0HADOOP\u4e0a\uff0c\u547d\u4ee4\uff1a<\/p>\n<p>java -classpath .\/lib\/hadoop-core-1.0.2.jar:.\/lib\/commons-logging-1.1.1.jar:.\/lib\/commons-configuration-1.6.jar:.\/lib\/commons-lang-2.4.jar:.\/ test.HadoopDataTransport filelist.txt 100<\/p>\n<p>[root@datanode4 ~]#\u00a0cat filelist.txt<\/p>\n<p>\/201401\/21\/3bb30e5f-cf3e-4182-a7c0-ed486f80a87a.mp3<br \/>\n\/201401\/21\/1d62fff3-744e-41c9-8152-5243ee0ce7b4.mp3<br \/>\n\/201401\/21\/784a53f4-2125-4bc6-bf6a-0adf40981a64.mp3<\/p>\n<p>\u4ee3\u7801\u6e05\u5355\uff1a\u4e0d\u559c\u6b22\u5199\u6ce8\u91ca\u3002<\/p>\n<blockquote><p>package test;<\/p>\n<p>import java.io.BufferedReader;<br \/>\nimport java.io.FileReader;<br \/>\nimport java.io.FileWriter;<br \/>\nimport java.io.IOException;<br \/>\nimport java.io.InputStream;<br \/>\nimport java.io.PrintWriter;<br \/>\nimport java.util.ArrayList;<br \/>\nimport java.util.List;<\/p>\n<p>import org.apache.hadoop.conf.Configuration;<br \/>\nimport org.apache.hadoop.fs.FSDataInputStream;<br \/>\nimport org.apache.hadoop.fs.FSDataOutputStream;<br \/>\nimport org.apache.hadoop.fs.FileSystem;<br \/>\nimport org.apache.hadoop.fs.Path;<br \/>\nimport org.apache.hadoop.io.IOUtils;<\/p>\n<p>public class HadoopDataTransport implements Runnable {<\/p>\n<p>private static FileSystem src_fs;<br \/>\nprivate static FileSystem des_fs;<br \/>\nprivate static List&lt;String&gt; FailureList = new ArrayList&lt;String&gt;();<\/p>\n<p>private static List&lt;Boolean&gt; jobs = new ArrayList&lt;Boolean&gt;();<\/p>\n<p>static final String basedir = &#8220;\/data\/album&#8221;; \/\/ \u4fdd\u5b58\u5728HADOOP\u4e2d\u7684\u6839\u76ee\u5f55<\/p>\n<p>public static void main(String[] args) throws Exception {<br \/>\nList&lt;String&gt; al = new ArrayList&lt;String&gt;();<br \/>\nSystem.out.println(&#8220;read list from file : &#8221; + args[0]);<br \/>\nint pm = Integer.parseInt(args[1]);<br \/>\nFileReader fr = new FileReader(args[0]);<br \/>\nBufferedReader br = new BufferedReader(fr);<br \/>\nString line;<br \/>\nwhile ((line = br.readLine()) != null) {<br \/>\nif (line!=null &amp;&amp; line.length()&gt;10) {<br \/>\nal.add(line);<br \/>\n}<br \/>\n}<br \/>\nint pmax = pm;<br \/>\nif(al.size()&gt;0){<br \/>\nfor(int i=0;i&lt;al.size();i=i+pmax){<br \/>\nint max = ((i+pmax)&gt;al.size()?al.size():(i+pmax));<br \/>\nnew Thread(new HadoopDataTransport(al.subList(i, max),i,0)).start();<br \/>\njobs.add(true);<br \/>\n}<br \/>\n}<br \/>\nbr.close();<br \/>\nfr.close();<br \/>\nwhile(jobs.size()&gt;0){<br \/>\nThread.sleep(500);<br \/>\n}<br \/>\nif(FailureList.size()&gt;0){\/\/ save failure list<br \/>\nsaveFailurelist(args[0]);<br \/>\n}<br \/>\n}<\/p>\n<p>private List&lt;String&gt; filelist;<br \/>\nprivate int threadid;<br \/>\npublic HadoopDataTransport(List&lt;String&gt; list,int id,int opts){<br \/>\nthis.filelist = list;<br \/>\nthis.threadid = id;<br \/>\n}<\/p>\n<p>@Override<br \/>\npublic void run() {<br \/>\nint success = 0;<br \/>\nint failure =0;<\/p>\n<p>for(String file : filelist){<br \/>\nSystem.out.println(threadid + &#8221; ==&gt; processing &#8230;. &#8220;+file);<br \/>\ntry {<br \/>\nInputStream in = readFromHadoopInputStream(file);<br \/>\nif(in !=null){<br \/>\nString filename = createHdfsFile(file, in);<br \/>\nSystem.out.println(threadid + &#8221; ==&gt; &#8220;+filename + &#8221; &#8230;&#8230; done.&#8221;);<br \/>\nsuccess++;<br \/>\n}<br \/>\n} catch (Exception e) {<br \/>\nAddFailure(file);<br \/>\nSystem.out.println(threadid + &#8221; ==&gt; &#8220;+file + &#8221; &#8230;&#8230; failure.&#8221; + e.getMessage());<br \/>\nfailure++;<br \/>\n}<br \/>\n}<br \/>\nSystem.out.println(&#8220;===============ThreadId: &#8220;+threadid+&#8221; Finished Total:&#8221; + filelist.size() +&#8221; Success : &#8220;+ success+&#8221; Failure :&#8221;+ failure+&#8221;==========&#8221;);<br \/>\njobs.remove(true);<br \/>\n}<\/p>\n<p>private static void AddFailure(String filename){<br \/>\nFailureList.add(filename);<br \/>\n}<br \/>\nprivate static void saveFailurelist(String failurefile) throws Exception{<br \/>\nSystem.out.println(&#8220;Save &#8220;+&#8221;failure_&#8221;+failurefile);<br \/>\nFileWriter w = new FileWriter(&#8220;failure_&#8221;+failurefile);<br \/>\nPrintWriter out = new PrintWriter(w);<br \/>\nfor(String s : FailureList){<br \/>\nout.println(s);<br \/>\n}<br \/>\nout.close();<br \/>\n}<\/p>\n<p>private static String createHdfsFile(String dst, InputStream in)<br \/>\nthrows Exception {<br \/>\nFSDataOutputStream out =des_fs.create(<br \/>\ngetPath(dst));<br \/>\nIOUtils.copyBytes(in, out, 256 * 1024);<br \/>\nout.close();<br \/>\nreturn dst;<br \/>\n}<\/p>\n<p>private static InputStream readFromHadoopInputStream(String dst) throws Exception {<br \/>\nPath path = getPath(dst);<br \/>\nif (src_fs.exists(path)) {<br \/>\nFSDataInputStream is = src_fs.open(path);<br \/>\nreturn is;<br \/>\n} else {<br \/>\nthrow new Exception(&#8220;the file is not found .&#8221;);<br \/>\n}<br \/>\n}<\/p>\n<p>private static Path getPath(String dir) {<br \/>\nreturn new Path(basedir + dir);<br \/>\n}<\/p>\n<p>static {<br \/>\nConfiguration src_conf = new Configuration();<br \/>\nsrc_conf.set(&#8220;fs.default.name&#8221;, &#8220;hdfs:\/\/192.168.2.50:8020&#8221;); \/\/ conf.set(&#8220;fs.default.name&#8221;,<br \/>\n\/\/ &#8220;hdfs:\/\/namenode-backup-vip:8020&#8221;);<br \/>\nsrc_conf.set(&#8220;dfs.block.size&#8221;, &#8220;524288&#8221;);<br \/>\nsrc_conf.set(&#8220;dfs.replication&#8221;, &#8220;2&#8221;);<br \/>\nsrc_conf.set(&#8220;dfs.permissions&#8221;, &#8220;false&#8221;);<br \/>\nsrc_conf.set(&#8220;dfs.permissions.supergroup&#8221;, &#8220;resin&#8221;);<br \/>\nsrc_conf.set(&#8220;dfs.web.ugi&#8221;, &#8220;resin&#8221;);<br \/>\ntry {<br \/>\nsrc_fs = FileSystem.get(src_conf);<br \/>\n} catch (IOException e) {<br \/>\ne.printStackTrace();<br \/>\n}<br \/>\nSystem.out<br \/>\n.println(&#8220;Initialize Hadoop Server src fs hdfs:\/\/192.168.2.50:8020&#8221;);<\/p>\n<p>Configuration des_conf = new Configuration();<br \/>\ndes_conf.set(&#8220;fs.default.name&#8221;, &#8220;hdfs:\/\/192.168.2.85:8020&#8221;); \/\/ conf.set(&#8220;fs.default.name&#8221;,<br \/>\n\/\/ &#8220;hdfs:\/\/namenode-backup-vip:8020&#8221;);<br \/>\ndes_conf.set(&#8220;dfs.block.size&#8221;, &#8220;524288&#8221;);<br \/>\ndes_conf.set(&#8220;dfs.replication&#8221;, &#8220;2&#8221;);<br \/>\ndes_conf.set(&#8220;dfs.permissions&#8221;, &#8220;false&#8221;);<br \/>\ndes_conf.set(&#8220;dfs.permissions.supergroup&#8221;, &#8220;resin&#8221;);<br \/>\ndes_conf.set(&#8220;dfs.web.ugi&#8221;, &#8220;resin&#8221;);<br \/>\ntry {<br \/>\ndes_fs = FileSystem.get(des_conf);<br \/>\n} catch (IOException e) {<br \/>\ne.printStackTrace();<br \/>\n}<br \/>\nSystem.out<br \/>\n.println(&#8220;Initialize Hadoop Server des fs hdfs:\/\/192.168.2.85:8020&#8221;);<br \/>\n}<\/p>\n<p>}<\/p><\/blockquote>\n","protected":false},"excerpt":{"rendered":"<p>\u524d\u6bb5\u65f6\u95f4\u65e7HADOOP\u5347\u7ea7\u5185\u5b58\uff0c\u9700\u8981\u628a\u67d0\u4e9b\u5171\u7528\u6570\u636e\u6587\u4ef6\u8fc1\u79fb\u5230\u4e34\u65f6\u7684HADOOP\u4e0a\uff0c\u5982\u679c\u7528hadoop \u547d\u4ee4\u6548\u7387N\u4f4e\uff0c \u4e8e\u662f\u5c31\u5199\u4e00\u4e2a\u5c0f\u7a0b\u5e8f\uff0c\u6309\u7167\u6587\u4ef6\u5217\u8868\uff0c\u628a\u6570\u636e\u8fc1\u79fb\u5230\u65b0HADOOP\u4e0a\uff0c\u547d\u4ee4\uff1a java -classpath .\/lib\/hadoop-core-1.0.2.jar:.\/lib\/commons-logging-1.1.1.jar:.\/lib\/commons-configuration-1.6.jar:.\/lib\/commons-lang-2.4.jar:.\/ test.HadoopDataTransport filelist.txt 100 [root@datanode4 ~]#\u00a0cat filelist.txt \/201401\/21\/3bb30e5f-cf3e-4182-a7c0-ed486f80a87a.mp3 \/201401\/21\/1d62fff3-744e-41c9-8152-5243ee0ce7b4.mp3 \/201401\/21\/784a53f4-2125-4bc6-bf6a-0adf40981a64.mp3 \u4ee3\u7801\u6e05\u5355\uff1a\u4e0d\u559c\u6b22\u5199\u6ce8\u91ca\u3002 package test; import java.io.BufferedReader; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; import java.io.PrintWriter; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class HadoopDataTransport implements Runnable { private static FileSystem &hellip; <a href=\"https:\/\/www.strongd.net\/?p=1313\" class=\"more-link\">Continue reading <span class=\"screen-reader-text\">HadoopDataTransport Hadoop\u6570\u636e\u79fb\u52a8\u65b9\u6cd5<\/span><\/a><\/p>\n","protected":false},"author":2,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1],"tags":[],"class_list":["post-1313","post","type-post","status-publish","format-standard","hentry","category-java"],"_links":{"self":[{"href":"https:\/\/www.strongd.net\/index.php?rest_route=\/wp\/v2\/posts\/1313","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.strongd.net\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.strongd.net\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.strongd.net\/index.php?rest_route=\/wp\/v2\/users\/2"}],"replies":[{"embeddable":true,"href":"https:\/\/www.strongd.net\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=1313"}],"version-history":[{"count":2,"href":"https:\/\/www.strongd.net\/index.php?rest_route=\/wp\/v2\/posts\/1313\/revisions"}],"predecessor-version":[{"id":1315,"href":"https:\/\/www.strongd.net\/index.php?rest_route=\/wp\/v2\/posts\/1313\/revisions\/1315"}],"wp:attachment":[{"href":"https:\/\/www.strongd.net\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=1313"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.strongd.net\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=1313"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.strongd.net\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=1313"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}