第 6 章 Hadoop Streaming

6.1 Hadoop上运行MapReduce

6.1.1 通过Hadoop Streaming运行MapReduce

Hadoop Streaming是一个基础的API,它可以通过任何一种语言按照MapReduce的方式 去执行脚本文件,例如Mapper和Reducer脚本文件。它的原理类似于UNIX操作系统中的pipe管道操 作,使用Unix标准流作为Hadoop和用户程序之间的接口。Mapper和Reducer以键值对通过标准输入流(stdin)和标准输出流(stdout)进行输入和输 出,Reducer最后会将运行结果保存到HDFS中。Hadoop Streaming最大的一个优势是在于它允许以 非Java语言编写MapReduce任务,这些任务能够与Java语言编写的任务一样在Hadoop集群中运 行。除此之外,Hadoop Streaming适合进行文本处理,比如从大型CSV文件中读取每一行的处理,Hadoop Streaming还可以处理二进制流,比如可以读取图像形式的输入。Hadoop Streaming支持Perl、Python、R等多种语言。

通过Hadoop Streaming,任何可执行文件都可以被指定为Mapper/Reducer。这些可执行文件不需要事先 存放在集群上;如果不在worknodes里面,则需要用-file选项让framework把可执行文件作为 作业的一部分,一起打包提交。但是要处理的文件必须要放到Hadoop的HDFS上。

如果一个可执行文件被用于mapper,则在mapper初始化时, 每一个mapper任务会把这个可执行文件作为一个单独的进程启动。 mapper任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。 同时,mapper收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成key/value对,作为mapper的输出。 如果一个可执行文件被用于reducer,每个reducer任务会把这个可执行文件作为一个单独的进程启动。 Reducer任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。 同时,reducer收集可执行文件进程标准输出的内容,并把每一行内容转化成key/value对,作为reducer的输出。 这是Map/Reduce框架和streaming mapper/reducer之间的基本通信协议。

由于Hadoop是用Java开发的,在使用hadoop streming带来便利的同时,也存在一定的局限性,具体如下 1. 只能通过stdin和stdout来进行输入输出,不像 Java 的程序那样可以在代码里使用 API,控制力比较弱。 2. Streaming默认只能处理文本数据Textfile,对于二进制数据,比较好的方法是将二进制的key, value进行重编码,转化为文本。 3. 由于涉及转换过程,会带来更多的开销

以下为一个简单的Python版本的MapReduce执行过程,在本书中,我们将多次使用Python和R作为运行 MapReduce的程序。

  • 上传数据到HDFS

    $ hadoop fs -put /home/dmc/TREASURE1.txt /
  • 执行MapReduce

    $ hadoop jar /home/dmc/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.5.2.jar \
    -file /home/dmc/mapper.py /home/dmc/reducer.py \
    -input /TREASURE1.txt \
    -output WordsCount \
    -mapper "/home/dmc/mapper.py" \
    -reducer "/home/dmc/reducer.py" \

6.1.2 Hadoop Streaming的常用参数设置

    -mapred.job.name: 设置作业名
    -input:指定作业输入,可以是文件或者目录,可以使用*通配符,也可以使用多次指定多个文件或者目录作为输入
    -output:指定作业输出目录,并且必须不存在,并且必须有创建该目录的权限,-output只能使用一次
    -mapper:指定mapper可执行程序或Java类,必须指定且唯一
    -reducer:指定reducer可执行程序或Java类
    -file:将指定的本地/hdfs文件分发到各个Task的工作目录下
    -jobconf mapred.reduce.tasks: 指定reducer的个数,如果设置为0或者-reducer NONE 则没有reducer程序,mapper的输出直接作为整个作业的输出
    -jobconf mapred.map.tasks 设置map任务个

6.1.3 Hadoop其他MapReduce API

Hadoop是用Java编写的,有丰富的Java MapReduce模块类可供使用。 编译jar文件需要使用javac(在JDK中)和hadoop-mapreduce-client-core-xxx.jar,具体调用命令如下。

  javac -classpath $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.2.jar -d FirstJar\

  jar -cvf FirstJar.jar -C FirstJar/

Java版本的Hadoop语法为

  hadoop jar FirstJar.jar [mainClass] input output

虽然Hadoop是用Java语言写成的,但是MapReduce过程并不必须是用Java来写。除了以上介绍的Hadoop Streaming以外,还有支持C++语言的Hadoop Pipe的接口。 Hadoop Pipes是Hadoop MapReduce的C++接口的名称,Pipes框架通过Sockets实现用户编写的C++ Mapper/Reducer程序和Hadoop框架内的TaskTracker的数据通信。 Hadoop Pipe语法为

   hadoop pipes \
     -D hadoop.pipes.java.recordreader=true \
     -D hadoop.pipes.java.recordwriter=true \
     -input sample.txt \
     -output output \
     -program myCPProgram

这里再不做过多介绍, 感兴趣的读者可以参考 Hadoop官方文档或者(???)(???)