第 7 章 基于MapReduce的统计建模

7.0.1 什么任务适合使用MapReduce?

基于Hadoop的MapReduce主要用于进行数据处理(processing data),如对数据进行的初始加工,得到原始数据后进行必要变量的导出,对数据进行必要清理,对文本数据进行情感的标注,对图像数据进行简单的图片识别等。适合使用MapReduce实现的功能有以下几种: 排序(Sorting)。MapReduce非常适合进行排序和归类汇总,因为MapReduce的过程中存在排序机制,经过map过程处理后的数据在进入到reduce过程之前会根据key/value对中的key值进行排序,用户也可以指定进行排序的特征。 搜索(Searching)。MapReduce适合对全部数据进行扫描性工作,寻找数据的某些特征,如寻找某些特殊的字符串,结构,表达等。searching过程往往在mapper中进行,可以通过结合Python语言里的正则表达等进行实现。searching过程与统计密切相关,是传统统计方法内的初级的操作,但随着数据量的大量增加,使用传统方法实现大数据内的searching过程缺乏时效性,在处理大规模数据时很难发现数据应有的价值,而MapReduce能够解决这个问题。 索引(indexing)。indexing指找到某一模块的具体所在的位置,最常见的使用是搜索引擎公司根据用户提供的关键字快速找到所需的网页。 自然语言处理(NLP)。现代数据中很大一部分是文本数据,因此在处理数据过程中会涉及到自然语言处理(NLP),如对文本数据进行情感标注,将文本信息运用在建模中,将文本信息转化为数值向量等,常用的文本数据处理方法有TFIDF, Word2Vec, doc2vec等。

除上述提到的具体功能外,MapReduce还适合对所有的独立数据的处理,如果进行大规模的独立性数据的操作,Hadoop的MapReduce是不二的选择,因为其具有廉价且处理速度快的优势。

7.0.2 MapReduce-容错

在处理大型数据时,数据中某些记录出现错误是不可避免的。虽然我们在编写程序时会尽可能的考虑到可能出错的情况并对其进行处理,保证程序的健壮性和鲁棒性,但还是应该设置一种恢复机制来处理无法计划的错误情况,因为我们并不希望仅仅因为一条程序无法处理的不良记录造成整个任务的失败。

MapReduce的容错机制指在任务运行过程中,如果Mapper任务执行过程中出现错误,Mapper首先会重新尝试执行也就是重复执行,一般默认重复执行4次后,如果仍然执行失败就会放弃执行。除此之外,MapReduce还具有一种推测式执行机制,在这个机制下如果某个节点的任务执行时间超出预期(这个预期根据其他节点任务执行时间而定),MapReduce会启动其他节点执行相同任务,在某个节点任务最先执行完成之后杀死其他尚未完成的任务。推测式执行机制可以保证整个程序的执行效率,但也有可能会引发某些问题,推测式执行机制是可以关闭的。

Hadoop MapReduce提供了一项功能,允许跳过它认为会导致任务崩溃的记录。MapReduce任务执行过程中如果出现错误,出错节点内的TaskTracker会跟踪并确定导致失败的记录范围,然后TaskTracker将重新启动任务,但跳过错误的记录范围。若跳过错误记录范围后程序执行成功,执行结果将返回错误记录信息。具体操作过程中可以使用JobConf选项中的mapreduce.map.skip.maxrecordsmapreduce.reduce.skip.maxrecords两个命令分别设置map和reduce过程中允许跳过错误记录的最多次数。除此之外,MapReduce还设置了其他选项,比如使用mapreduce.map.maxattemptsmapreduce.reduce.maxattempts设置重复执行的最多次数等。 实际使用过程中,不能过度依赖MapReduce的容错功能,因为这些功能是在用户所写程序充分考虑各种错误情况下对无法预知的错误进行的处理机制,用户在编写程序时应尽可能的对数据的不确定性进行操作,比如在map和reduce函数的内部添加容错机制。

程序中应该考虑容错的情况可能有以下几种:首先,在使用Python或R中特殊的数据结构(比如R中的dataframe或Python中的pandas)来处理数据时,处理后的结果中会出现对应数据结构的特殊标记,如果没有对相应数据标记进行处理,再次处理数据时就会出现数据不符合要求的问题。其次,Hadoop储存数据时不建议数据添加表头,因为表头在数据处理时会被当做一行数据进行处理,容易导致任务运行出现异常。在存储数据时可以将表头单独储存为一个文件,供用户使用时进行参考。另外,在程序编写中要注意不同语言的编码,避免因为语言编码的不适配导致程序运行中出现错误。

7.0.3 使用MapReduce进行线性回归

假设我们有一个大型数据集,该数据集体量巨大,占用计算机的大量内存,且用传统单机方式进行线性回归时需要大量时间,现在我们将如何对该大型数据集进行回归分析?

通过使用Mapper和Reducer,Hadoop MapReduce可以用于实现大型数据集的线性回归。MapReduce将大型数据集拆分成多个数据块分发到可用节点之上,使用节点并行处理分布式数据。当我们使用Hadoop进行大数据集的线性回归运算时不会引发内存问题,因为该大型数据集将在Hadoop计算节点之间进行分发和处理,且能够大大缩短处理数据所用时间。使用这种方法实现的线性回归不能提供比lm()模型更高的预测精度,其精度与传统方法相同。

7.0.3.1 线性回归模型回顾

假设待处理的数据集中包含因变量\(y_{n\times 1}\)与自变量\(X_{n\times p}\),对数据构建线性回归模型如下:\[y=X\beta +\epsilon\] 待估系数\(\widehat \beta\)的解析解为:\[\widehat\beta = (X'X)^{-1}X'y\] 进行线性回归最大的问题就是求出待估系数\(\widehat \beta\)。 在大型数据集的线性回归中,由于\(n>>p\),在计算\(X'X\)\(X'y\) 时需要非常高的计算需求,对普通计算机来说难以在较短的时间内实现。而上述计算的结果\((X'X)_{p\times p}\)\((X'y)_{p\times 1}\)之间进行计算的计算量小,在普通计算机上也可以轻易完成。因此,计算\(\widehat \beta\)的最主要问题是\(X'X\)\(X'y\)的计算,而上述计算可以使用MapReduce实现。

使用MapReduce进行线性回归是相对容易的,因为待估系数\(\widehat \beta\)具有解析解,只需使用MapReduce将解析解求出即可。对于待估系数没有解析解的其他模型(如下面将介绍的logistic回归模型),使用MapReduce求解较为复杂。

7.0.3.2 线性回归模型的MapReduce实现原理

首先来看一个简单的例子,假设待处理的数据集中包含因变量\(y_{n\times 1}\)与自变量\(X_{n\times 1}\)\(X'y\)的计算可以写作: \[X'y = \begin{bmatrix} x_{1.}\\ x_{2.}\\ ...\\ x_{k.}\\ \end{bmatrix}' \begin{bmatrix} y_{1.}\\ y_{2.}\\ ...\\ y_ {k.}\\ \end{bmatrix} = \sum_{i=1}^k x_{i.}'y_{i.}\] 其中,k是指n行数据被分成了k份,在MapReduce的计算过程中,每份数据将被分配到不同的节点上进行计算。我们可以发现,在使用MapReduce进行\(X'y\)计算时,最终结果为每一个节点上的\(x_{i.}'y_{i.}\)的计算结果的和。因此,设计程序使map过程进行不同数据块的\(x_{i.}'y_{i.}\)计算,reduce过程进行求和计算,即可计算出\(X'y\)的值。当自变量为\(X_{n\times p}\)时,计算的思路相同。

我们也可以按照同样的方法进行\(X'X\)的计算。将\(X\)的每一列看做一个向量,\(X'X\)的计算过程即可转化为多个\(X'\)\(x_{.i}\)的乘积计算,如下式所示: \[ X'X = X' \begin{bmatrix} x_{.1}&x_{.2}&...& x_ {.p}\\ \end{bmatrix} = \begin{bmatrix} X'x_{.1}&X'x_{.2}&...& X'x_ {.p}\\ \end{bmatrix}\]\(X'x_{.i}\)的计算方法与\(X'y\)的计算方法完全相同。由此,我们可以使用MapReduce实现\(X'X\)\(X'y\)的计算.

7.0.4 使用MapReduce进行logistic回归

7.0.4.1 logistic回归模型回顾

在统计中,logistic回归(或logit回归)是一种概率分类模型,Logistic回归已在包括医学和社会科学领域在内的许多学科中得到广泛应用。 logistic回归分为二分类logistic回归和多分类logistic回归,二分类logistic回归的因变量的结果具有两种可能的类型,多分类logistic回归因变量的结果具有三种或多种可能的类型。logistic回归可以通过构建logistic函数来实现。

以二分类logistic回归为例,logit模型一般写为如下形式:\[P_i=\frac{1}{1+\exp(-(\beta_1+\beta_2X_i))}\] 也可以将其写为:\[\log \frac{P_i}{1-P_i} = \beta_1+\beta_2X_i\] 其中\(P_i/(1-P_i)\)也被称为优势比(odds ratio)。 logistic回归与普通线性回归的显著区别是不能通过解析解对其模型中的待估系数作出估计,只能采用牛顿迭代等迭代方法求出待估系数。在传统统计方法中,可以使用R中的glm()函数或Python中的sklearn.linear_model.LogisticRegression()函数轻松地对logistic回归模型的待估系数进行估算。但是对于大量数据的logistic回归处理而言存在一定困难。首先,由于logistic回归模型的待估系数不具有解析解,线性回归的MapReduce实现方法不能用于解决logistic回归问题,而需要大量数据交换的牛顿迭代等迭代过程对MapReduce而言是不容易实现的。 Logistic回归在标准行业内非常重要,是许多生产欺诈检测,广告质量和针对性产品检验的基础,目前企业处理Logistic回归最常见的实现方式是对所有要使用的大型训练集使用随机梯度下降(SGD),这种方法处理速度非常快,适合应用于大型数据的处理。但是,目前企业几乎不会使用Hadoop进行Logistic回归处理,大多数企业会使用spark等更高级的平台,在这些更高级的平台上处理Logistic回归获得的结果与传统单机方式能够达到一致。

7.0.4.2 logistic回归模型的MapReduce实现原理

由于使用MapReduce对logistic回归进行处理存在一定困难,在使用MapReduce进行相关计算时就要进行一定的取舍,通过舍弃一部分精确度求得待估系数近似的估计值。

使用MapReduce进行logistic回归模型处理时,首先仍会将𝑛个样本划分为𝑘个块,每个块由𝑚个观测值组成,每一个数据块会被分配到不同的节点上进行计算处理。 在每个节点上对节点中的数据块进行logistic回归,使用R或Python的相关函数很容易实现这一过程。待估系数\(\widehat \beta_l\)的估计表达式可写为:\[\widehat \beta_l = \arg~\max \sum _{i =1}^m \{ y_{li}x_{li}'\beta-\log(1+\exp\{x_i'\beta\})\}.\] 获得每个节点logistic回归处理结果后,将得到的各个\(\widehat \beta_l\)结果进行平均可获得整个logistic回归的待估系数\(\widehat \beta\)的估计值:\[\widehat \beta = \frac{1}{k}\sum_{l=1}^k \widehat \beta_l.\] 可以证明,当数据量很大的时候,使用\(\widehat \beta_l\)的平均值获得的\(\widehat \beta\)的估计值是非常接近单机牛顿迭代等方法计算出的估计值的。

使用上述方法进行logistic回归是存在一定的劣势的。首先,上述方法得到的估计值的方差表达式为:\[Var(\widehat \beta)=\frac{1}{k^2}\sum_{l=1}^k{Var(\widehat \beta_l)}\] 只要单个节点的方差的增加速度比\(k^2\)快,那么整体的方差会随着\(k\)的增加而变大,估计结果的准确性会受到影响。其次,\(\widehat \beta\)的估计值会受到\(k\)取值的影响,\(k\)取不同值时所估计的结果会不同。