第 14 章 使用Spark进行高级统计建模

利用Spark进行统计计算时,Spark会提供一些基础方法,但有时待处理问题的应用场景非常复杂,Spark现有工具不足够高效的进行处理以满足应用要求。因此可能需要使用Spark构建额外的统计计算工具,充分借鉴Spark现有优势,达到大规模数据处理的要求。

14.1 分布式系统的统计模型

在统计的发展过程中,统计算法的传统计算机实现经过几十年的研究已经非常成熟。但将传统的单机计算机模型完全迁移到分布式系统使其与分布式系统完全适配是非常困难的,甚至也需要耗费几十年的时间。

目前用于分布式系统中统计模型实现的方法主要有以下几种:

14.1.1 One-shot

每个worker节点并行计算估计量,worker节点将计算结果传达给 master节点,由master节点整合计算,获得平均的全局估计量。这种实现方法中每个worker节点只需与master节点进行一次通信,最大程度上节约了由于数据传输造成的时间和资源的浪费。

许多文献为One-shot方法的应用提供支持,如:

  • Zhang et al. (2013, JMLR)给出了OS估计器的MSE速率,并进一步使用了引导程序进行去偏
  • Liu 与 Ihler (2014, NIPS)研究了基于KL散度的平均
  • Lee at al. (2017, JMLR) 与 Battey et al. (2015)研究了使用L1或非凸罚分的稀疏回归估计,方法是平均局部去偏估计量
  • Fan et al. (2017)研究了分布式PCA估计

14.1.2 Iterative algorithms

master节点与worker节点之间进行多轮通信,但是也需要尽量压缩通信数量,且在传递过程中尽量不传递海塞矩阵等占空间较大的大型矩阵,这样也能在一定程度上提高传输效率。此方法适用于迭代算法的计算,如应用于牛顿迭代等其他计算中。

许多文献为Iterative algorithms方法的应用提供支持,如:

  • Shamir et al. (2014, ICML)提出了一种牛顿型迭代方法进行分布式优化
  • Wang et al. (2017, ICML)研究了分布式L1正则化损失最小化问题
  • Jordan et al. (2018, JASA)提出了一种通信有效的代理似然(CSL)框架来解决分布式统计推断问题

14.1.3 Lasso estimation

进行回归时可以对待估参数设置惩罚,即设置变量选择工具。对传统统计来说,变量选择工具可以通过lasso来实现,对β系数进行约束,以实现对模型控制的要求。 Battey et al. (2015), Wang et al. (2017), Lee at al. (2017, JMLR) 与 Jordan et al. (2018)等人的相关文献也为Lasso estimation的应用提供支持。

上述操作方法都不能直接把模型快速的部署在分布式系统上,而且这些方法存在一定缺陷,如:

  • 仅使用一轮通讯就无法进行效率估算
  • 主要侧重于L1估计,没有模型选择标准的保证
  • 需要随机分配的数据

这些缺陷也造成统计量的性质存在缺陷,所以在计算过程中使用这样的算法并不能保证其计算结果是最优的,这就使得分布式计算结果与单机全局变量存在一定差距,适用性比较差。 而且部分算法依赖于与模型的不断数据交换,比如迭代算法。由于在分布式系统内每份数据都要在多个节点内进行备份,当数据需要进行不断交换时,数据存储所需要的内存较大,对计算机的性能要求和对电力等资源的要求会提高,计算成本会变高。

14.2 DLSA

传统的统计模型仅仅适合单机计算。在分布式系统内实现的统计模型,应使模型移动到每一个数据节点上,使模型处理每一个节点上的数据,最后把子模型结合起来构建出一个新模型,而且这个模型可以证明与单机方式算出来的结果是一模一样的,这样才能得到全局最优解。 如何去构造这样一个快速组合方式是目前分布式系统的重要研究领域和应用瓶颈。

14.2.1 DLSA的主要思想

1.首先,通过使用分布式系统内每个worker节点上的本地数据分别估计每个worker节点的参数θ。可以通过使用标准的统计估计方法(例如,最大似然估计)有效地完成。假设每个worker节点上的样本量足够大,与全局估计量相比,所得参数估计量及其渐近协方差估计量应一致,但在统计上不有效。 2.每个worker节点将θ的局部估计量及其渐近协方差估计量传递给master节点。因为不考虑高维模型设置,所以在这方面的通信成本基本可以忽略不计。 3.构造加权最小二乘型目标函数。这可以看作是全局对数似然函数的局部二次逼近。可以预计,在适当的规则性条件下,所得的估计量与全局MLE方法(即全局估计量)具有相同的渐近协方差。

14.2.2 DLSA实现公式

定义\(L(\theta;Z)\)为可能的二次微分损失函数,全局损失函数定义为\(L(\theta)=N^{-1}\sum^{N}_{i=1}{L(\theta;Z_{i})}\),其全局最小值为\(\hat{\theta}=\arg\min{L(\theta)}\),真实值为\(\theta_{0}\) 首先使用泰勒展开对全局损失函数进行分解和逼近,如下所示: \[L(\theta)=N^{-1}\sum^{K}_{k=1}\sum_{i\in{S_k}}{L(\theta;Z_{i})}=N^{-1}\sum^{K}_{k=1}\sum_{i\in{S_k}}\{{L(\theta;Z_{i})-L(\hat{\theta_k};Z_{i})}\}+C_1 \\ \approx N^{-1}\sum^{K}_{k=1}\sum_{i\in{S_k}}{(\theta-\hat{\theta_k})^{T}}{\ddot{L}(\hat{\theta_k};Z_{i})(\theta-\hat{\theta_k})}+C_2\] 直观地讲,上式中的二次形式应该是全局损失函数的良好局部逼近(Wang and Leng, 2007)。这使得我们考虑以下加权最小二乘目标函数: \[\widetilde{L}(\theta)=N^{-1}\sum_{k}(\theta-\hat{\theta_k})^{T}\{\sum_{i\in{S_k}}\ddot{L}(\hat{\theta_k};Z_{i})\}(\theta-\hat{\theta_k})\\ \overset{def}=\sum_{k}(\theta-\hat{\theta_k})^{T}{\alpha_k}\hat{\sum^{-1}_{k}}(\theta-\hat{\theta_k})\]

最终得到加权最小二乘估计器(WLSE),其形式如下: \[\widetilde{\theta}=\arg\min_\theta\widetilde{L}(\theta)=(\sum_k\alpha_k\hat\sum^{-1}_k)^{-1}(\sum_k\alpha_k\hat\sum^{-1}_k\hat\theta_k)\] 对于同时进行的变量选择和参数估计,遵循 Wang and Leng (2007)的想法,并考虑了在master节点上的的自适应Lasso目标函数(Zou, 2006; Zhang and Lu, 2007), \[Q_\lambda(\theta)=\widetilde{L}(\theta)+\sum_j\lambda_j|\theta_j|\]

特别的,考虑基于分布式贝叶斯信息标准(DBIC)的标准,如下所示: \[DBIC_\lambda=(\widetilde\theta_\lambda-\widetilde\theta)^T\hat\sum^{-1}(\widetilde\theta_\lambda-\widetilde\theta)+\log{N}\times{df_\lambda/N}\]

14.2.3 DLSA算法流程

DLSA算法输入的是用于对每个分区数据集进行建模的模型函数,输出的是加权最小二乘估计量\(\widetilde\theta\),协方差矩阵\(\hat\sum\),DBIC结果\(DBIC_\lambda\)

算法步骤如下:

  • step1:将整个分布式群集可用内存预先确定为\(M_{ram}\),将CPU核心总数确定为\(C_{core}\),并将要处理的总数据大小确定为\(D_{total}\)
  • step2:定义批处理块的数量\(N_{chunks}\),以允许进行内存不足时的数据处理。建议Spark系统中的\(N_{chunks}\)至少大于\(3 \times D_{total}/M_{ram}\)
  • step3:定义分区数\(P_{partition} = D_{total} /(N_{chunks} \times C_{core})\)
  • step4:定义一个模型函数,其中输入为包含响应变量,协变量和分区ID的大小为\(n \times (p+2)\)的Python Pandas DataFrame,输出为一个大小为\(p \times (p+1)\)的Pandas DataFrame,其第一列存储\(\widetilde \theta_k\),其余列存储\(\widetilde \sum^{-1}_k\)
  • step5:对用i表示的从1到N的chunks进行如下操作
    (a).如果数据以其他格式存储,则将数据块传输到Spark的分布式DataFrame
    (b).将\(\{1,...,P_{partition}\}\)中的整数分区标签随机分配给Spark DataFrame的每一行
    (c).如果未通过分区标签对数据进行分区,则在分布式系统中重新对DataFrame进行分区
    (d).通过分配的分区标签对Spark DataFrame进行分组
    (e).使用Spark的Grouped map Pandas UDFs API 将模型函数应用于每个分组数据集,并获得一个\((P_{partition}) \times (p + 1)\)维的分布式Spark DataFrame \(R_i\)
  • step6:整合各个分区和块上的\(R_i\),返回\(p \times (p+1)\)维的矩阵\(R_{final}\)
  • step7:返回\(\widetilde\theta\)\(\hat\sum\)\(DBIC_\lambda\)

由于DLSA算法中的最后一步是在主节点上执行的,并且由于需要从工作节点到主节点的数据转换,因此需要一种称为“ Apache Arrow”(https://arrow.apache.org/)的特殊工具插入到系统中,这样可以在Spark的分布式DataFrame和Python的Pandas DataFrame之间进行有效的数据转换。

14.2.4 DLSA算法的贡献

  • 灵活:能够处理各种回归问题(例如LM,GLM,Cox模型)。
  • 高效:匹配全局效率,在数据异构分布时特别有用。
  • 轻松选择变量:可以获得Oracle属性,BIC类型标准可以在主机上使用,并且不需要进一步的通信。
  • 智能计算:可以获得解析解决方案,并且LARS算法可以应用在主机上。
  • Spark的新分布式统计API:改进了Spark当前的机器学习API,可以在Spark中进行有效的内存不足时的建模。

14.3 DLSA实际应用介绍

将分布式系统中的DLSA计算方法应用于飞机延误数据,应用数据的具体细节见下表:

Variable Description Variable used in the model
Delayed Whether the flight is delayed, 1 for Yes; 0 for No Used as the response variable
Year Year between 1987 and 2008 Used as numerical variable
Month Which month of the year Converted to 11 dummies
DayofMonth Which day of the month Used as numerical variable
DayofWeek Which day of the week Converted to 6 dummies
DepTime Actual departure time Used as numerical variable
CRSDepTime Scheduled departure time Used as numerical variable
CRSArrTime Scheduled arrival time Used as numerical variable
ElapsedTime Actual elapsed time Used as numerical variable
Distance Distance between the origin and destination in miles Used as numerical variable
Carrier Flight carrier code for 29 carriers Top 7 carries converted to 7 dummies
Destination Destination of the flight (total 348 categories) Top 75 destination cities converted to 75 dummies
Origin Departing origin (total 343 categories) Top 75 origin cities converted to 75 dummies

其中,Delayed为因变量,其他变量为自变量。 按照第二部分给出的算法过程编写相应DLSA算法对上述数据进行计算,可验证计算结果得到其他机构相关研究佐证。相关算法可参考https://github.com/feng-li/dlsa

14.3.1 计算细节

  • 最终,模型中总共使用了181个变量。样本总数为1.139亿个观测值。
  • 硬盘上的原始数据集12 GB。虚拟转换后,即使所有虚拟对象都以稀疏矩阵格式存储,内存中的整体大小也超过52GB。因此,很难由单个计算机来处理该数据集。
  • 要在如此大的矩阵上进行操作,需要更多的内存(通常使用双精度浮点格式> 128 GB)。
  • 由于Spark中的内存不足问题,内置的分布式SGD算法(在spark.ml.classification.LogisticRegression模块中实现)在处理本问题时失败。

14.4 传统统计计算与分布式统计计算模型

由传统统计计算向分布式统计计算模型的转变是分布式数据的广泛应用驱动下的转型,传统统计计算与分布式统计计算模型主要有以下区别:

  • 数据状态:传统统计计算适用于静态线下数据,分布式统计计算模型可适用于动态实时大样本全量数据
  • 计算模式:传统统计计算为单机存储单机计算模式,而分布式统计计算模型采用分布式存储分布式计算
  • 数据存储:传统统计计算的统计模型与数据存储一体化,而分布式统计计算模型的模型与数据分别存储,计算时将统计模型部署到数据
  • 计算逻辑:传统统计计算为单个模型对应单个算法,模型之间各异,算法不互通,分布式统计计算模型采取所有模型一体化计算框架,每种模型均可以通过此框架在分布式系统内实现,模型的适用性更高
  • 需求实现:传统统计模型采取线下模型评估线上应用模式,模型的时效性不能得到保证,分布式统计计算模型采取实时模型评估与预测需求的模式,模型的时效性更能得到保障,满足了目前大多数的应用需求