第 5 章 MapReduce

5.1 MapReduce 工作原理

想象当你是一个大厨,你带着你的厨子小弟们准备研发一种新型的辣椒酱。而这个辣椒酱的的配方是薄 荷叶一撮,洋葱半个,番茄两个,辣椒十根,大蒜四根,切碎后加入适量的3勺盐和1升水,再放入混合 研磨机里研磨。那么你准备怎么做才能最快呢?

很简单,让一个小弟切薄荷,一个小弟切洋葱,一个小弟切番茄,一个小弟切辣椒,一个小弟切大蒜, 或者为了让工作量尽可能的平均,可以让切薄荷的小弟一起把洋葱跟番茄切了,而让切洋葱跟切番茄的 小弟一起加入切辣椒行列,这样每人的工作量都大概均匀了。然后等大家把原料都切好了后,再一起放 进研磨机里。

没错,其实这样的一个“分而治之”然后再汇总的工作框架正是MapReduce。聪明的读者一定马上就能理解 了吧,其实大名鼎鼎的MapReduce并不是什么算法或者模型,它其实是一个分布式的计算框架,在这样的 框架下我们可以进行任意我们想要做的事。是不是感觉棒棒哒。MapReduce的介绍还可参 见(???)(???)

5.1.1 Map原理

我们可以再看看刚刚的例子,在那个例子里其实我们总体上可以分做两步:第一步就是将原材料分成基 本均匀的几份,然后分别让小弟们去切;而第二步则是将大家的工作量都汇总在一起,做下一步工作。 而以上第一步就是Map。

那么当我们在采用MapReduce进行分布式计算的时候,Map又是什么呢?

当我们提交了一个任务后,比如我们提交了一个WordCount的Python脚本。往往这个脚本会分做两部分,一部 分是用于Map,而另一部分则是用于Reduce。可以想象在上个例子中,你将制作辣椒酱的工序分做了两步: 第一步就是将切碎原材料的事情分发出去,第二步就是进行合并然后进行研磨。同样,当你提了一个任 务给MapReduce后,你得告诉它哪一步是Map,哪一步是Reduce。然后MapReduce就会将这个任务的分配分 发给各个节点。

在Map阶段,当每个结点都接收到具体的Mapper程序后。任务开始运行,每个节点会读取它那部分的数据。 这些节点会按行把数据进行切分,然后以行为标准输入提供给可执行文件进程。在输出的时 候,Mapper会把收到的每一行标准输出的内容转化成key/value对。为了方便后续的Reducer进行数据的 交接,Mapper在输出的时候会根据key值进行排序。

5.1.2 Reduce原理

当Map任务进行的同时,对应的Reducer也会从各个地方进行Mapper输出的“拉取”,其实这是一个复制的 行为。然后Reducer会对从各个地方拉来的数据进行不断的merge。而merge的存储主要还是从内存到磁盘, 也就是从Mapper那儿拉取的输出会先存储在内存中,当大于一定阈值后会将其写到磁盘,整个过程会直到Map端结束后才会结束。 最终形成了Reducer的输入。最后会根据我们在一开始提交的Reduce,即可执 行文件或脚本,对整个输入进行处理。

5.1.3 Partitioner和Combiner

在Mapper处理完数据后,如何确定对应数据给到哪一个Reduce进行处理呢?在Map和Reduce中间往往还存在 Partition和Combine的过程,Partitioner确定数据给到哪一个Reducer,Combiner则实现了在节点上先进行一次数据合并。 显然Partitioner是必须的,否则会引起Reduce的混乱,而Combiner则是可选的,它在某些情况下可以提高MapReduce处理 的效率

5.1.3.1 Partitioner原理

Partitioner的作用是对Map端输出的数据key作一个散列,使数据能够均匀分布在各个Reduce上进行后续操作,它可以确定将数据 分给哪一个Reducer进行处理,因此它直接影响了Reduce端的负载均衡。HashPartitoner是MapReduce默认 的Partitioner,用户也可以自定义自己的Partitioner。

5.1.3.2 Combiner原理

在MapReduce的实际计算中,每一个map都可能会产生大量的本地输出,Combiner的作用就是基点上的Map端的输出先做一次合并, 以减少在map和reduce节点之间的数据传输量,提高数据传输效率。Combiner的输出是Reducer的输入,Combiner在节点上先 基于key对value进行聚合,它没有默认的实现方式,需要用户在conf中指定。 由于MapReduce计算的逻辑不同,并不是所有的MapReduce都需要Combiner,例如在求和、求最大值时使用 则可以明显提高效率,但对求中位数等则不适用。

5.1.4 MapReduce 工作机制

一个MapReduce程序起始于用户通过JobClient提交一个作业(Job),然后该job的相关信息就会被发送到Job Tracker, Job Tracker是MapReduce框架的中心,他需要与集群中的机器进行定时通信,这是一个类似于心跳的机制, 它管理哪些程序应该跑在哪些机器上,同时管理所有job失败、重启等操作。 对应地在每台机器上也有监控该节点任务运行情况的Task Tracker,Task Tracker通过心跳机制和Job Tracker进行通信, JobTracker会搜集这些信息来对job进行监控和管理。 由于Job Tracker和Task Tracker模式的工作监控机制存在很多问题,在Hadoop0.23.0版本以后,采用了统一的 资源管理器Hadoop Yarn,取而代之的是ResourceManager、AppliactionMaster和NodeManager, 有兴趣的读者可以进一步学习。

5.1.4.1 任务提交

MapReduce的作业(Job)是由JobClient提交给Hadoop集群的。一个Job包括了输入数据,MapReduce程序和 配置信息。Hadoop将作业分为若干个任务(task)来执行,其中包括Map任务和Reduce任务。

我们在作业提交之前,需要对作业进行一些相应的配置。首先,我们必须提交相应的Map程序、Reduce程序以及他们所依赖的程序, 还需要设置作业的输入输出路径及其他配置(如map和reduce任务个数)。

而我们的作业配置好后,是通过JobClinet来提交。当我们提交了一个MapReduce作业后,MapReduce程 序会立马启动。而这个时候JobClient会向JobTracker请求一个新的JobId。同时检查作业输入和输出说 明,比如输出文件是否已经存在。如果作业的输入有不满足要求的情况,则会立马终止作业并报错。

而当检验作业的各项都符合规定后,JobClient会将运行的作业所需要的资源复制到一个以作业ID命名 的文件下。而这个文件是在JobTracker的文件系统中。JobClient提交完成后,JobTracker会将作业加 入队列,然后进行调度,默认的调度方法是先进先出的方式。为了创建任务运行列表,JobTracker从该 共享文件系统中获取相应的信息,以计算输入分片信息,并针对每一个分片创建Map任务以及Reduce任 务。

5.1.4.2 分配任务

而任务的分配是通过TaskTracker和JobTracker之间的心跳机制完成的。在任务执行的过程 中,TaskTracker会定期发送“心跳”给JobTracker,以用来告诉JobTracker它的状态,如是否还在运行 或是否准备好进行新的任务。

当TaskTracker没有执行任务的时候,JobTracker可以为之选择任务。而在为TaskTracker选择任 务(task)之前,JobTracker首先要选定任务所在的作业(Job)。而根据TaskTacker的固定数量的任务槽, 选择好作业之后JobTracker就可以为该作业选定一个任务,分别为Map任务和Reduce任务。

对于一个Map任务JobTracker会考虑TaskTracker的网络位置,会为之选取一个与其输入分片距离最近 的TaskTracker。最理想的情况是任务运行在和输入分片在同一个机器上(数据本地化),次之是机架 本地化。而在选择Reduce任务的时候,JobTracker简单的从待运行的Reduce任务列表中选取下一个来 执行。

5.1.4.3 执行任务

TaskTracker会通过共享文件系统把作业的相应文件,如代码、输入输出信息,复制到TaskTracker所在 的文件系统,从而实现作业文件的本地化。同时,TaskTracker将应用程序所需要的全部文件从分布式 缓存复制到本地磁盘。然后TaskTracker为任务新建一个本地工作目录,并把jar文件中的内容解压到这 个文件夹下,然后TaskTracker新建一个TaskRunner实例来运行该任务。TaskRunner启动一个新的JVM来 运行每个任务,以便用户自定义的Map和Reduce函数不会影响到TaskTracker。子进程通过接 口与父进程通信。任务的子进程每隔几秒便告知父进程它的进度,直到任务完成。

5.1.4.4 完成任务

当JobTracker收到作业的最后一个任务已完成的通知后,便把作业状态设置为“成功”。然后 在JobClient查询状态时知道任务已经完成。于是JobClient打印一条消息告知用户,然后从RunJob方法 返回。

最后JobTracker清空作业的工作状态,指示TaskTracker也清空作业的工作状态。

Map任务将结果写入本地硬盘,而非HDFS。因为Map任务的结果是中间结果,要给Reduce任务进行再次处 理,处理完之后Map任务的结果就没有价值了,通常是被删掉。HDFS上的同一份数据,通常情况下是要备 份的。如果存入HDFS,那么就有些小题大做了。

5.1.4.5 MapReduce常用参数设置

在了解了MapReduce的基本知识后,可以发现在执行MapReduce过程中,很多参数都不是唯一的,用户都可以根据实际 情况来设定合适的参数,例如手动设定reduce为1,则可以将所有Map的输出都集中到一个Reduce进行处理。下面列举了一些 常用的参数设置。

    mapred.reduce.tasks(mapreduce.job.reduces):
    //默认启动的reduce数。通过该参数可以手动修改reduce的个数。默认值为1。

    mapreduce.task.io.sort.factor:
    //Reduce Task中合并小文件时,一次合并的文件数据,每次合并的时候选择最小的前10进行合并。默认值为10。

    mapreduce.task.io.sort.mb:
    //Map Task缓冲区所占内存大小。默认值为100。

    mapred.min.split.size:
    // mapper在拉取数据的时候split的最小值,默认为1B。

    mapreduce.jobtracker.handler.count:
    //JobTracker可以启动的线程数,一般为tasktracker节点的4%。默认值为10。

    mapreduce.reduce.shuffle.parallelcopies:
    //reuduce shuffle阶段并行传输数据的数量。默认值为5。

    mapreduce.map.output.compress:
    //map输出是否进行压缩,如果压缩就会多耗cpu,但是减少传输时间,如果不压缩,就需要较多的传输带宽。默认为False。
    //配合 mapreduce.map.output.compress.codec使用,默认是 org.apache.hadoop.io.compress.DefaultCodec,可以根据需要设定数据压缩方式。

    mapreduce.tasktracker.tasks.reduce.maximum:
    //一个tasktracker并发执行的reduce数,建议为cpu核数,默认值为2。

5.1.5 Shuffle

5.1.5.1 Map端的Shuffle

Map端的Shuffle实际包含了输入(input)过程、切分(partition)过程、溢写(spill)过程(sort和combine过 程)、merge过程。

  • input过程。当我们在采用MapReduce进行分布式计算时,我们首先会将数据放在HDFS上, 而HDFS上的数据是以block为单位存储的。而map task在拉取数据的时候,是按split为单位拉取 的。这里的Split不是真正的对数据文件进行切分,只是确定每一个Mapper应该读取多少数据量,一个数据文件 可能存储在多个block上,每个block上可能存在多个split, 即一个block可以被分为多个split, 这和mapreduce的具体设置相关,默认为一对一。

  • partition过程。partition过程即前文提到的Partitioner组件实现的效果。Mapper的输出是 key/value 对, 如果选择默认的HashPartitioner,则会对key值进行hash并得到一个结果,该结果决定了当前的 Mapper的输出到底是交给哪个Reducer。 即对key值进行hash后再按reduce task数量取模,并得到将该Mapper的输出交由哪个Reducer进行 处理。key/value对以及partition 的结果都会被写入缓冲区,减少磁盘I/O的影响。

  • spill过程。当map task输出结果过多大于一定阈值时,就可能发生内存溢出,即从内 存往磁盘写数据的过称为spill。当将缓冲区的数据临时写入磁盘后,就可以释放这部分内存,然后重 新利用这块缓冲区。而整个spill过程都是由另外单独线程来完成,并不影响往缓冲区写Map结果的线 程。在执行spill之前,Map会根据key值对每个输出进行排序,这样做是为了方便后续的Reduce过程。 而在我们进行Reduce之前,有些时候会先对数据进行一些整合,可以理解为一个mini-Reduce过程,称 为Combiner。即将有相同key的输出做一些预处理,比如相加或者求最大等。

  • merge过程。每次溢写都会在磁盘上生成一个溢写文件,当map task 完成时,内存缓冲区中的全 部数据都溢写到磁盘中形成众多溢写文件。merge过程就是要将这些溢写文件归并到一起。

5.1.5.2 Reduce端的Shuffle

在 Map task 开始有输出后,Reduce task便会开始进行不断拉与自己对应的每个 Map task的结果输出 并不断的进行 merge ,也最终形成一个文件作为 Reduce task 的输入文件。

  • copy过程,简单拉取数据。Reduce task进行数据拉取的过程其实就是复制的过程。

  • merge过程,和Map端的merge类似。在Reduce task 将数据copy 过来后,会先放入内存缓冲区中, 与Map的内存方式相似,当大于一定阈值后,并从内存写入到磁盘。然后在磁盘中生成了众多的溢写文 件。这样的merge方式会一直在运行到Map端没有输出的数据时才结束,最终输入文件。

  • reducer的输入文件。通过merge最后会生了输入文件,大多数情况下存在于磁盘中,但是需要将 其放入内存中。当reducer 输入文件已定,整个 Shuffle 阶段才算结束。然后就是 Reducer 执行, 把结果放到 HDFS 上。

5.1.6 MapReduce运行实例

下面介绍使用MapReduce解决实际问题的简单案例

5.1.6.1 案例运行过程

此案例的最终目标为找到各个年份的温度的最大值,输入数据是各个观测站观测得到的带有温度记录的原始数据,原始数据示例如下

 0067011990999991950051507004...9999999N9+00001+99999999999...
 0043011990999991950051512004...9999999N9+00221+99999999999...
 0043011990999991950051518004...9999999N9-00111+99999999999...
 0043012650999991949032412004...0500001N9+01111+99999999999...
 0043012650999991949032418004...0500001N9+00781+99999999999...
 ...

输入的原始数据未设置key/value键,hadoop会根据每行的字节数自动为其设置key/value键,如下所示

 (0,   0067011990999991950051507004...9999999N9+00001+99999999999...)
 (106, 0043011990999991950051512004...9999999N9+00221+99999999999...)
 (212, 0043011990999991950051518004...9999999N9-00111+99999999999...)
 (318, 0043012650999991949032412004...0500001N9+01111+99999999999...)
 (424, 0043012650999991949032418004...0500001N9+00781+99999999999...)
 ...
 

设置好key/value键的数据传入map过程进行处理,map过程提取年份和气温信息,mapper处理后数据输出如下

(1950,0)
(1950,22)
(1950,-11)
(1949,111)
(1949,78)
 ...
 

此例中map函数对所有的行都进行了操作,map函数里输出的数据以年份作为key,与输入数据不同。 mapper输出的数据会被按照键值进行排序和分组,处理后数据如下

(1949,[111,78])
(1950,[0,22,-11])
 ...

排序和分组过程是中间过程,一般用户观测不到,但可以对其进行控制,如可以选择以什么键值作为排序依据,hadoop的计算过程中一个重要的计算时间损失在排序过程。

数据进行排序和分组后会被复制至reduce过程,此例中reduce函数会遍历列表并获取每个key值下的最大数据,reduce过程输出数据如下

(1949,111)
(1950,22)
 ...

最终得出各个年份的温度的最大值,输出结果返回至HDFS。

5.1.6.2 MapReduce运行前试验

在进行MapReduce之前,要验证所写的map函数和reduce函数是否存在问题,可拿出与原始数据相同结构的少量数据在Linux服务器中做简单的模拟,使用管道制作一个单一map和单一reduce的过程进行试验,试验命令如下:

  cat sample_input.txt | map.py | sort | reducer.py
  

在这个过程中,map.py和reducer.py是用户所写的map函数和reduce函数,sort是Linux的内置函数,这个过程与真正MapReduce过程的区别是没有将数据写入到硬盘中的过程,直接在内存中进行传输。

如果通过Linux管道实验时程序失败,那么这个程序一定不能在hadoop上成功运行,但如果程序可以在Linux内成功运行,在Hadoop上也不一定能运行成功。因为这个过程只是单一map和单一reduce过程的特例,使用此特例不能确定当有多个map和reduce过程时程序运行是否能够成功。