第 5 章 MapReduce
5.1 MapReduce 工作原理
想象当你是一个大厨,你带着你的厨子小弟们准备研发一种新型的辣椒酱。而这个辣椒酱的的配方是薄 荷叶一撮,洋葱半个,番茄两个,辣椒十根,大蒜四根,切碎后加入适量的3勺盐和1升水,再放入混合 研磨机里研磨。那么你准备怎么做才能最快呢?
很简单,让一个小弟切薄荷,一个小弟切洋葱,一个小弟切番茄,一个小弟切辣椒,一个小弟切大蒜, 或者为了让工作量尽可能的平均,可以让切薄荷的小弟一起把洋葱跟番茄切了,而让切洋葱跟切番茄的 小弟一起加入切辣椒行列,这样每人的工作量都大概均匀了。然后等大家把原料都切好了后,再一起放 进研磨机里。
没错,其实这样的一个“分而治之”然后再汇总的工作框架正是MapReduce。聪明的读者一定马上就能理解 了吧,其实大名鼎鼎的MapReduce并不是什么算法或者模型,它其实是一个分布式的计算框架,在这样的 框架下我们可以进行任意我们想要做的事。是不是感觉棒棒哒。MapReduce的介绍还可参 见(???)、(???)。
5.1.1 Map原理
我们可以再看看刚刚的例子,在那个例子里其实我们总体上可以分做两步:第一步就是将原材料分成基 本均匀的几份,然后分别让小弟们去切;而第二步则是将大家的工作量都汇总在一起,做下一步工作。 而以上第一步就是Map。
那么当我们在采用MapReduce进行分布式计算的时候,Map又是什么呢?
当我们提交了一个任务后,比如我们提交了一个WordCount的Python脚本。往往这个脚本会分做两部分,一部 分是用于Map,而另一部分则是用于Reduce。可以想象在上个例子中,你将制作辣椒酱的工序分做了两步: 第一步就是将切碎原材料的事情分发出去,第二步就是进行合并然后进行研磨。同样,当你提了一个任 务给MapReduce后,你得告诉它哪一步是Map,哪一步是Reduce。然后MapReduce就会将这个任务的分配分 发给各个节点。
在Map阶段,当每个结点都接收到具体的Mapper程序后。任务开始运行,每个节点会读取它那部分的数据。 这些节点会按行把数据进行切分,然后以行为标准输入提供给可执行文件进程。在输出的时 候,Mapper会把收到的每一行标准输出的内容转化成key/value对。为了方便后续的Reducer进行数据的 交接,Mapper在输出的时候会根据key值进行排序。
5.1.2 Reduce原理
当Map任务进行的同时,对应的Reducer也会从各个地方进行Mapper输出的“拉取”,其实这是一个复制的 行为。然后Reducer会对从各个地方拉来的数据进行不断的merge。而merge的存储主要还是从内存到磁盘, 也就是从Mapper那儿拉取的输出会先存储在内存中,当大于一定阈值后会将其写到磁盘,整个过程会直到Map端结束后才会结束。 最终形成了Reducer的输入。最后会根据我们在一开始提交的Reduce,即可执 行文件或脚本,对整个输入进行处理。
5.1.3 Partitioner和Combiner
在Mapper处理完数据后,如何确定对应数据给到哪一个Reduce进行处理呢?在Map和Reduce中间往往还存在 Partition和Combine的过程,Partitioner确定数据给到哪一个Reducer,Combiner则实现了在节点上先进行一次数据合并。 显然Partitioner是必须的,否则会引起Reduce的混乱,而Combiner则是可选的,它在某些情况下可以提高MapReduce处理 的效率
5.1.3.1 Partitioner原理
Partitioner的作用是对Map端输出的数据key作一个散列,使数据能够均匀分布在各个Reduce上进行后续操作,它可以确定将数据 分给哪一个Reducer进行处理,因此它直接影响了Reduce端的负载均衡。HashPartitoner是MapReduce默认 的Partitioner,用户也可以自定义自己的Partitioner。
5.1.3.2 Combiner原理
在MapReduce的实际计算中,每一个map都可能会产生大量的本地输出,Combiner的作用就是基点上的Map端的输出先做一次合并, 以减少在map和reduce节点之间的数据传输量,提高数据传输效率。Combiner的输出是Reducer的输入,Combiner在节点上先 基于key对value进行聚合,它没有默认的实现方式,需要用户在conf中指定。 由于MapReduce计算的逻辑不同,并不是所有的MapReduce都需要Combiner,例如在求和、求最大值时使用 则可以明显提高效率,但对求中位数等则不适用。
5.1.4 MapReduce 工作机制
一个MapReduce程序起始于用户通过JobClient提交一个作业(Job),然后该job的相关信息就会被发送到Job Tracker, Job Tracker是MapReduce框架的中心,他需要与集群中的机器进行定时通信,这是一个类似于心跳的机制, 它管理哪些程序应该跑在哪些机器上,同时管理所有job失败、重启等操作。 对应地在每台机器上也有监控该节点任务运行情况的Task Tracker,Task Tracker通过心跳机制和Job Tracker进行通信, JobTracker会搜集这些信息来对job进行监控和管理。 由于Job Tracker和Task Tracker模式的工作监控机制存在很多问题,在Hadoop0.23.0版本以后,采用了统一的 资源管理器Hadoop Yarn,取而代之的是ResourceManager、AppliactionMaster和NodeManager, 有兴趣的读者可以进一步学习。
5.1.4.1 任务提交
MapReduce的作业(Job)是由JobClient提交给Hadoop集群的。一个Job包括了输入数据,MapReduce程序和 配置信息。Hadoop将作业分为若干个任务(task)来执行,其中包括Map任务和Reduce任务。
我们在作业提交之前,需要对作业进行一些相应的配置。首先,我们必须提交相应的Map程序、Reduce程序以及他们所依赖的程序, 还需要设置作业的输入输出路径及其他配置(如map和reduce任务个数)。
而我们的作业配置好后,是通过JobClinet来提交。当我们提交了一个MapReduce作业后,MapReduce程 序会立马启动。而这个时候JobClient会向JobTracker请求一个新的JobId。同时检查作业输入和输出说 明,比如输出文件是否已经存在。如果作业的输入有不满足要求的情况,则会立马终止作业并报错。
而当检验作业的各项都符合规定后,JobClient会将运行的作业所需要的资源复制到一个以作业ID命名 的文件下。而这个文件是在JobTracker的文件系统中。JobClient提交完成后,JobTracker会将作业加 入队列,然后进行调度,默认的调度方法是先进先出的方式。为了创建任务运行列表,JobTracker从该 共享文件系统中获取相应的信息,以计算输入分片信息,并针对每一个分片创建Map任务以及Reduce任 务。
5.1.4.2 分配任务
而任务的分配是通过TaskTracker和JobTracker之间的心跳机制完成的。在任务执行的过程 中,TaskTracker会定期发送“心跳”给JobTracker,以用来告诉JobTracker它的状态,如是否还在运行 或是否准备好进行新的任务。
当TaskTracker没有执行任务的时候,JobTracker可以为之选择任务。而在为TaskTracker选择任 务(task)之前,JobTracker首先要选定任务所在的作业(Job)。而根据TaskTacker的固定数量的任务槽, 选择好作业之后JobTracker就可以为该作业选定一个任务,分别为Map任务和Reduce任务。
对于一个Map任务JobTracker会考虑TaskTracker的网络位置,会为之选取一个与其输入分片距离最近 的TaskTracker。最理想的情况是任务运行在和输入分片在同一个机器上(数据本地化),次之是机架 本地化。而在选择Reduce任务的时候,JobTracker简单的从待运行的Reduce任务列表中选取下一个来 执行。
5.1.4.3 执行任务
TaskTracker会通过共享文件系统把作业的相应文件,如代码、输入输出信息,复制到TaskTracker所在 的文件系统,从而实现作业文件的本地化。同时,TaskTracker将应用程序所需要的全部文件从分布式 缓存复制到本地磁盘。然后TaskTracker为任务新建一个本地工作目录,并把jar文件中的内容解压到这 个文件夹下,然后TaskTracker新建一个TaskRunner实例来运行该任务。TaskRunner启动一个新的JVM来 运行每个任务,以便用户自定义的Map和Reduce函数不会影响到TaskTracker。子进程通过接 口与父进程通信。任务的子进程每隔几秒便告知父进程它的进度,直到任务完成。
5.1.4.4 完成任务
当JobTracker收到作业的最后一个任务已完成的通知后,便把作业状态设置为“成功”。然后 在JobClient查询状态时知道任务已经完成。于是JobClient打印一条消息告知用户,然后从RunJob方法 返回。
最后JobTracker清空作业的工作状态,指示TaskTracker也清空作业的工作状态。
Map任务将结果写入本地硬盘,而非HDFS。因为Map任务的结果是中间结果,要给Reduce任务进行再次处 理,处理完之后Map任务的结果就没有价值了,通常是被删掉。HDFS上的同一份数据,通常情况下是要备 份的。如果存入HDFS,那么就有些小题大做了。
5.1.4.5 MapReduce常用参数设置
在了解了MapReduce的基本知识后,可以发现在执行MapReduce过程中,很多参数都不是唯一的,用户都可以根据实际 情况来设定合适的参数,例如手动设定reduce为1,则可以将所有Map的输出都集中到一个Reduce进行处理。下面列举了一些 常用的参数设置。
mapred.reduce.tasks(mapreduce.job.reduces):
//默认启动的reduce数。通过该参数可以手动修改reduce的个数。默认值为1。
mapreduce.task.io.sort.factor:
//Reduce Task中合并小文件时,一次合并的文件数据,每次合并的时候选择最小的前10进行合并。默认值为10。
mapreduce.task.io.sort.mb:
//Map Task缓冲区所占内存大小。默认值为100。
mapred.min.split.size:
// mapper在拉取数据的时候split的最小值,默认为1B。
mapreduce.jobtracker.handler.count:
//JobTracker可以启动的线程数,一般为tasktracker节点的4%。默认值为10。
mapreduce.reduce.shuffle.parallelcopies:
//reuduce shuffle阶段并行传输数据的数量。默认值为5。
mapreduce.map.output.compress:
//map输出是否进行压缩,如果压缩就会多耗cpu,但是减少传输时间,如果不压缩,就需要较多的传输带宽。默认为False。
//配合 mapreduce.map.output.compress.codec使用,默认是 org.apache.hadoop.io.compress.DefaultCodec,可以根据需要设定数据压缩方式。
mapreduce.tasktracker.tasks.reduce.maximum:
//一个tasktracker并发执行的reduce数,建议为cpu核数,默认值为2。
5.1.5 Shuffle
5.1.5.1 Map端的Shuffle
Map端的Shuffle实际包含了输入(input)过程、切分(partition)过程、溢写(spill)过程(sort和combine过 程)、merge过程。
input过程。当我们在采用MapReduce进行分布式计算时,我们首先会将数据放在HDFS上, 而HDFS上的数据是以block为单位存储的。而map task在拉取数据的时候,是按split为单位拉取 的。这里的Split不是真正的对数据文件进行切分,只是确定每一个Mapper应该读取多少数据量,一个数据文件 可能存储在多个block上,每个block上可能存在多个split, 即一个block可以被分为多个split, 这和mapreduce的具体设置相关,默认为一对一。
partition过程。partition过程即前文提到的Partitioner组件实现的效果。Mapper的输出是 key/value 对, 如果选择默认的HashPartitioner,则会对key值进行hash并得到一个结果,该结果决定了当前的 Mapper的输出到底是交给哪个Reducer。 即对key值进行hash后再按reduce task数量取模,并得到将该Mapper的输出交由哪个Reducer进行 处理。key/value对以及partition 的结果都会被写入缓冲区,减少磁盘I/O的影响。
spill过程。当map task输出结果过多大于一定阈值时,就可能发生内存溢出,即从内 存往磁盘写数据的过称为spill。当将缓冲区的数据临时写入磁盘后,就可以释放这部分内存,然后重 新利用这块缓冲区。而整个spill过程都是由另外单独线程来完成,并不影响往缓冲区写Map结果的线 程。在执行spill之前,Map会根据key值对每个输出进行排序,这样做是为了方便后续的Reduce过程。 而在我们进行Reduce之前,有些时候会先对数据进行一些整合,可以理解为一个mini-Reduce过程,称 为Combiner。即将有相同key的输出做一些预处理,比如相加或者求最大等。
merge过程。每次溢写都会在磁盘上生成一个溢写文件,当map task 完成时,内存缓冲区中的全 部数据都溢写到磁盘中形成众多溢写文件。merge过程就是要将这些溢写文件归并到一起。
5.1.5.2 Reduce端的Shuffle
在 Map task 开始有输出后,Reduce task便会开始进行不断拉与自己对应的每个 Map task的结果输出 并不断的进行 merge ,也最终形成一个文件作为 Reduce task 的输入文件。
copy过程,简单拉取数据。Reduce task进行数据拉取的过程其实就是复制的过程。
merge过程,和Map端的merge类似。在Reduce task 将数据copy 过来后,会先放入内存缓冲区中, 与Map的内存方式相似,当大于一定阈值后,并从内存写入到磁盘。然后在磁盘中生成了众多的溢写文 件。这样的merge方式会一直在运行到Map端没有输出的数据时才结束,最终输入文件。
reducer的输入文件。通过merge最后会生了输入文件,大多数情况下存在于磁盘中,但是需要将 其放入内存中。当reducer 输入文件已定,整个 Shuffle 阶段才算结束。然后就是 Reducer 执行, 把结果放到 HDFS 上。
5.1.6 MapReduce运行实例
下面介绍使用MapReduce解决实际问题的简单案例
5.1.6.1 案例运行过程
此案例的最终目标为找到各个年份的温度的最大值,输入数据是各个观测站观测得到的带有温度记录的原始数据,原始数据示例如下
0067011990999991950051507004...9999999N9+00001+99999999999...
0043011990999991950051512004...9999999N9+00221+99999999999...
0043011990999991950051518004...9999999N9-00111+99999999999...
0043012650999991949032412004...0500001N9+01111+99999999999...
0043012650999991949032418004...0500001N9+00781+99999999999...
...
输入的原始数据未设置key/value键,hadoop会根据每行的字节数自动为其设置key/value键,如下所示
(0, 0067011990999991950051507004...9999999N9+00001+99999999999...)
(106, 0043011990999991950051512004...9999999N9+00221+99999999999...)
(212, 0043011990999991950051518004...9999999N9-00111+99999999999...)
(318, 0043012650999991949032412004...0500001N9+01111+99999999999...)
(424, 0043012650999991949032418004...0500001N9+00781+99999999999...)
...
设置好key/value键的数据传入map过程进行处理,map过程提取年份和气温信息,mapper处理后数据输出如下
(1950,0)
(1950,22)
(1950,-11)
(1949,111)
(1949,78)
...
此例中map函数对所有的行都进行了操作,map函数里输出的数据以年份作为key,与输入数据不同。 mapper输出的数据会被按照键值进行排序和分组,处理后数据如下
(1949,[111,78])
(1950,[0,22,-11])
...
排序和分组过程是中间过程,一般用户观测不到,但可以对其进行控制,如可以选择以什么键值作为排序依据,hadoop的计算过程中一个重要的计算时间损失在排序过程。
数据进行排序和分组后会被复制至reduce过程,此例中reduce函数会遍历列表并获取每个key值下的最大数据,reduce过程输出数据如下
(1949,111)
(1950,22)
...
最终得出各个年份的温度的最大值,输出结果返回至HDFS。
5.1.6.2 MapReduce运行前试验
在进行MapReduce之前,要验证所写的map函数和reduce函数是否存在问题,可拿出与原始数据相同结构的少量数据在Linux服务器中做简单的模拟,使用管道制作一个单一map和单一reduce的过程进行试验,试验命令如下:
cat sample_input.txt | map.py | sort | reducer.py
在这个过程中,map.py和reducer.py是用户所写的map函数和reduce函数,sort是Linux的内置函数,这个过程与真正MapReduce过程的区别是没有将数据写入到硬盘中的过程,直接在内存中进行传输。
如果通过Linux管道实验时程序失败,那么这个程序一定不能在hadoop上成功运行,但如果程序可以在Linux内成功运行,在Hadoop上也不一定能运行成功。因为这个过程只是单一map和单一reduce过程的特例,使用此特例不能确定当有多个map和reduce过程时程序运行是否能够成功。