第 12 章 使用Spark进行机器学习

12.1 机器学习库

MLlib是Spark的机器学习(ML)库。其目标是使实用的机器学习可扩展且容易。 在较高级别,它提供了以下工具:

ML算法(ML Algorithms):常见的学习算法,例如分类,回归,聚类和协作过滤 特征化(Featurization):特征提取,变换,降维和选择 管道(Pipelines):用于构建,评估和调整ML管道的工具 持久性(Persistence):保存和加载算法,模型和管道 实用程序(Utilities):线性代数,统计信息,数据处理等。

在最新发布的Spark 3.0中,MLlib也进行了一系列的更新,包括添加了基于树的特征转换、添加了两个新的评估器MultilabelClassificationEvaluator和RankEvaluator、添加了用于跟踪ML pipeline状态的Spark ML侦听器、增加了分解机分类器和回归器、添加了高斯朴素贝叶斯分类器和互补朴素贝叶斯分类器等等。

12.2 MLlib API

MLlib同时包含基于RDD的API和基于DataFrame的API。 从Spark 2.0开始,spark.mllib软件包中基于RDD的API已进入维护模式。Spark的主要机器学习API现在是spark.ml软件包中基于DataFrame的API。 MLlib仍会通过错误修复在spark.mllib中支持基于RDD的API,但不会向基于RDD的API添加新功能。在Spark 2.x发行版中,MLlib向基于DataFrames的API添加功能,以与基于RDD的API达到功能一致。

MLlib由基于RDD的API切换到基于DataFrame的API有如下原因:

  • 与RDD相比,DataFrames提供了更加用户友好的API。DataFrames具有许多好处,包括Spark数据源,SQL/DataFrame查询,Tungsten和Catalyst优化以及跨语言的统一API。

  • 用于MLlib的基于DataFrame的API为ML算法和多种语言提供了统一的API。

  • DataFrame促进了实用ML Pipelines的形成,特别是特征转换功能。有关详细信息,请参见管道指南。

12.3 Spark基本统计操作

12.3.1 计算相关性Correlation

计算两列数据之间的相关性是统计中的常见操作。 在spark.ml中,提供了可以灵活的计算许多列数据之间的成对相关性的方法。目前支持的计算相关性的方法是Pearson和Spearman法。

使用Python运行Spark计算相关性可使用pyspark.ml.stat.Correlation内的corr函数。该函数使用指定的方法为向量的输入数据集计算相关矩阵,输出一个DataFrame,其中包含向量列的相关矩阵。 该函数的一般使用形式为:

 corr(dataset, column, method='pearson')

其中,参量分别为:

  • dataset,数据集或数据框。

  • column,需要为其计算相关系数的向量列的名称。这必须是数据集的一列,并且必须是Vector对象。

  • method,指定用于计算相关性的方法,目前支持:pearson法和spearman法,默认方法是pearson法。

使用示例如下:

 from pyspark.ml.linalg import Vectors
 from pyspark.ml.stat import Correlation

 data = [(Vectors.sparse(4, [(0, 1.0), (3, -2.0)]),),
         (Vectors.dense([4.0, 5.0, 0.0, 3.0]),),
         (Vectors.dense([6.0, 7.0, 0.0, 8.0]),),
         (Vectors.sparse(4, [(0, 9.0), (3, 1.0)]),)]
 df = spark.createDataFrame(data, ["features"])

 r1 = Correlation.corr(df, "features").head()
 print("Pearson correlation matrix:\n" + str(r1[0]))

 r2 = Correlation.corr(df, "features", "spearman").head()
 print("Spearman correlation matrix:\n" + str(r2[0]))

pyspark.sql.DataFrame内也有corr函数,但两函数存在不同。pyspark.sql.DataFrame内的corr函数仅计算DataFrame中某两列数据之间的相关系数,但pyspark.ml.stat.Correlation内的corr函数可计算多列数据两两之间的相关系数并形成相关系数矩阵。

12.3.2 描述性统计(Summarizer)

通过pyspark.ml.stat.Summarizer程序包为Dataframe提供矢量列摘要统计信息。该程序包中可用于计算的函数有最大值max,最小值min,平均值mean,方差variance,非零值数numNonZeros,以及计数total等。

pyspark.ml.stat.Summarizer程序包内大多数函数的使用方法一致,以mean函数为例,其一般使用形式为:

  mean(col, weightCol=None)

其中,参量col为待计算的列,weightCol为计算该列平均值时使用的权重,若不输入默认权重为1。运行该函数后返回均值摘要列。

pyspark.ml.stat.Summarizer程序包内还有一个特殊函数metrics,给定计算指标列表,该函数可以提供一个构建器,使其可以计算给定的计算指标。该函数一般使用形式为:

 metrics(*metrics)

参量*metrics为可以提供的指标,可以提供的计算指标有:mean、sum、variance、std、count、numNonzeros、max、min、normL2(欧几里得范数)、normL1(L1范数)。 metrics函数运行后返回pyspark.ml.stat.SummaryBuilder对象,目前,该接口的性能比使用RDD接口的性能慢2到3倍。

pyspark.ml.stat.SummaryBuilder类是提供有关给定列的摘要统计信息的构建器对象。用户无法直接创建此类构建器,而应使用pyspark.ml.stat.Summarizer中的metrics方法创建。 该类构建器对象使用summary函数运行,运行该函数后返回包含统计信息的汇总列。该结构的确切内容是在创建构建器期间确定的。summary函数的一般使用形式为:

 summary(featuresCol, weightCol=None)

其中,参量featuresCol为包含特征向量对象的列。weightCol为包含权重值的列,默认权重为1.0

相关函数使用示例如下:

 sc = spark.sparkContext # make a spakr context for RDD

 from pyspark.ml.stat import Summarizer
 from pyspark.sql import Row
 from pyspark.ml.linalg import Vectors

 df = sc.parallelize([Row(weight=1.0, features=Vectors.dense(1.0, 1.0, 1.0)),
                      Row(weight=0.0, features=Vectors.dense(1.0, 2.0, 3.0))]).toDF()

 # create summarizer for multiple metrics "mean" and "count"
 summarizer = Summarizer.metrics("mean", "count")

 # compute statistics for multiple metrics with weight
 df.select(summarizer.summary(df.features, df.weight)).show(truncate=False)

 # compute statistics for multiple metrics without weight
 df.select(summarizer.summary(df.features)).show(truncate=False)

 # compute statistics for single metric "mean" with weight
 df.select(Summarizer.mean(df.features, df.weight)).show(truncate=False)

 # compute statistics for single metric "mean" without weight
 df.select(Summarizer.mean(df.features)).show(truncate=False)

12.4 机器学习管道

在本节中,我们介绍机器学习管道(ML Pipelines)的概念。ML Pipelines提供基于DataFrame的统一的高级API集,可帮助用户创建和调整实用的机器学习管道。

12.4.1 主要概念

MLlib对用于机器学习算法的API进行了标准化,从而使将多种算法组合到单个管道或工作流中变得更加容易。本节介绍了Pipelines API引入的关键概念,其中,Pipelines概念主要受scikit-learn项目的启发。

  • DataFrame:此ML API使用Spark SQL中的DataFrame作为ML数据集,该数据集可以支持多种数据类型,如向量,文字,图像和结构化数据。例如,DataFrame可以具有不同的列,用于存储文本,特征向量,真实标签和预测。

  • 转换器(Transformer):转换器是一种算法,也是一种抽象,包括特征转换器和学习模型。 从技术上讲,转换器应用了transform()方法,该方法通常通过附加一个或多个列将一个DataFrame转换为另一个。例如,特征转换器可以获取一个DataFrame,读取一列(例如文本),将其映射到一个新列(例如特征向量),然后输出一个新的DataFrame并附加映射的列。

  • 估计器(Estimator):估计器是一种算法,可以适合于DataFrame生成转换器。估计器抽象了学习算法的概念和其他训练数据的任何算法的概念。从技术上讲,一个估计器应用fit()方法,该方法接受一个DataFrame并生成一个模型,这个模型就是一个转换器。例如,诸如LogisticRegression之类的学习算法是估计器,调用fit()可以训练LogisticRegressionModel,后者是一个模型,也是转换器。

  • 管道(Pipeline):管道由按特定顺序运行的一系列PipelineStages(转换器和估计器)组成,以指定ML工作流。在机器学习中,通常需要运行一系列算法来处理数据并从中学习。例如,简单的文本文档处理工作流程可能包括几个阶段: 将每个文档的文本拆分为单词。 将每个文档的单词转换成数字特征向量。 使用特征向量和标签学习预测模型。 MLlib将这样的工作流表示为管道。

  • 参数(Parameter):MLlib估计器和转换器使用统一的API来指定参数。参数是具有独立文件的命名参数。ParamMap是一组(参数,值)对。将参数传递给算法的主要方法有两种:设置实例的参数。例如,如果lr是LogisticRegression的实例,则可以调用lr.setMaxIter(10)以使lr.fit()最多使用10次迭代;将ParamMap传递给fit()transform()。ParamMap中的任何参数都将覆盖以前通过setter方法指定的参数。

12.4.2 Pipelines运作过程

ML Pipelines的运作过程为:一条管道被指定为一个阶段序列,每个阶段可以是一个Transformer或Estimator。这些阶段按顺序运行,并且输入的DataFrame在通过每个阶段时都会进行转换。对于Transformer阶段,在DataFrame上调用transform()方法。对于Estimator阶段,调用fit()方法以生成一个Transformer(它将成为PipelineModel或已拟合Pipeline的一部分),并且在DataFrame上调用该Transformer的transform()方法。

ML Pipelines和模型可以被保存到磁盘以供以后使用,其存储的内容可跨Scala,Java和Python使用。但是,由于R当前使用修改后的格式,因此保存在R中的模型只能重新加载到R中。

12.5 分类与回归

12.5.1 逻辑回归

逻辑回归是一种预测分类响应的流行方法。这是广义线性模型的一种特殊情况,可以预测结果的可能性。在spark.ml中,逻辑回归可以通过使用二项式逻辑回归来预测二进制结果,或者可以通过使用多项逻辑回归来预测多类结果。使用family参数在这两种算法之间进行选择,或者不设置它,Spark会推断出正确的变体。 通过将family参数设置为 “multinomial”,可以将多项式逻辑回归用于二进制分类。它将产生两组系数和两个截距。 当对具有恒定非零列的数据集进行LogisticRegressionModel拟合而没有截距时,Spark MLlib为恒定非零列输出零系数。此行为与R glmnet相同,但与LIBSVM不同。 逻辑回归的spark.ml实现还支持提取训练集中的模型摘要。要注意的是,,因此仅在驱动程序上可用。

Python中用于实现ML的逻辑回归的程序包为pyspark.ml.classification中的LogisticRegression,模型实现示例如下:

首先,导入相关程序包并准备所需数据。机器学习过程中应设置训练数据和测试数据,测试数据与训练数据的具体数据内容并不相同,但是其与训练数据应来自同一分布,具有相同的数据结构。测试数据能够用来测试机器学习算法的准确性。

 from pyspark.ml.linalg import Vectors
 from pyspark.ml.classification import LogisticRegression

 # Prepare training data from a list of (label, features) tuples.
 training = spark.createDataFrame([
     (1.0, Vectors.dense([0.0, 1.1, 0.1])),
     (0.0, Vectors.dense([2.0, 1.0, -1.0])),
     (0.0, Vectors.dense([2.0, 1.3, 1.0])),
     (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])

 # Prepare test data
 test = spark.createDataFrame([
     (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
     (0.0, Vectors.dense([3.0, 2.0, -0.1])),
     (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])

pyspark.ml.classification中调用LogisticRegressionLogisticRegression的默认形式为:

 pyspark.ml.classification.LogisticRegression(featuresCol='features', labelCol='label', predictionCol='prediction', maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-06, fitIntercept=True, threshold=0.5, thresholds=None, probabilityCol='probability', rawPredictionCol='rawPrediction', standardization=True, weightCol=None, aggregationDepth=2, family='auto', lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None)

上述参量内容为LogisticRegression内可以设置参量的默认值,也可通过规定某参量的取值对其进行修改。 通过LogisticRegression设定回归模型,若想了解某模型的具体参量设置,可使用explainParams函数。其中,explainParam(param)函数解释单个参数,并以字符串形式返回其名称,文档,可选的默认值和用户提供的值;explainParams()函数返回所有参数的文档及其可选的默认值和用户提供的值。

 # Create a LogisticRegression instance. This instance is an Estimator.
 lr = LogisticRegression(maxIter=10, regParam=0.01)
 # Print out the parameters, documentation, and any default values.
 print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

使用fit函数可利用可选参数将模型拟合到输入数据集,使用函数后返回拟合后的模型。fit函数的一般使用形式为:

 fit(dataset, params=None)

其中,参量dataset为输入数据集,它是pyspark.sql.DataFrame的实例;参量params为可选的参数映射,它覆盖嵌入式参数。如果给出了参数映射的列表/元组,则此调用适合每个参数映射并返回模型列表。

 # Learn a LogisticRegression model. This uses the parameters stored in lr.
 model1 = lr.fit(training)

想查看模型拟合过程中的参数选择过程,可使用extractParamMap函数。该函数提取嵌入的默认参数值和用户提供的值,然后将它们与输入中的额外参数值合并到平面参数图中,如果存在冲突,则使用后者,即按顺序使用:默认参数值<用户提供值<额外参数值。

 # Since model1 is a Model (i.e., a transformer produced by an Estimator),
 # we can view the parameters it used during fit().
 # This prints the parameter (name: value) pairs, where names are unique IDs for this
 # LogisticRegression instance.
 print("Model 1 was fit using parameters: ")
 print(model1.extractParamMap())

若想修改原模型中的某些参量设置,可直接对Python dictionaries形式的原模型参量默认值进行修改,修改方式可有如下几种:

 # We may alternatively specify parameters using a Python dictionary as a paramMap
 paramMap = {lr.maxIter: 20}
 paramMap[lr.maxIter] = 30  # Specify 1 Param, overwriting the original maxIter.
 paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})  # Specify multiple Params.

 # You can combine paramMaps, which are python dictionaries.
 paramMap2 = {lr.probabilityCol: "myProbability"}  # Change output column name
 paramMapCombined = paramMap.copy()
 paramMapCombined.update(paramMap2)

修改好模型参量值后,可重新使用数据对模型进行拟合并查看模型拟合过程中的参数值变化:

 # Now learn a new model using the paramMapCombined parameters.
 # paramMapCombined overrides all parameters set earlier via lr.set* methods.
 model2 = lr.fit(training, paramMapCombined)
 print("Model 2 was fit using parameters: ")
 print(model2.extractParamMap())

拟合好模型后可使用测试数据集利用拟合好的模型进行预测,预测可使用transform函数,该函数将输入的数据集使用拟合好的模型进行转换,由此实现对测试数据集的预测。

 # Make predictions on test data using the Transformer.transform() method.
 # LogisticRegression.transform will only use the 'features' column.
 # Note that model2.transform() outputs a "myProbability" column instead of the usual
 # 'probability' column since we renamed the lr.probabilityCol parameter previously.
 prediction = model2.transform(test)
 result = prediction.select("features", "label", "myProbability", "prediction") \
     .collect()

 for row in result:
     print("features=%s, label=%s -> prob=%s, prediction=%s"
           % (row.features, row.label, row.myProbability, row.prediction))

12.5.2 决策树

决策树及其集成是用于机器学习任务的分类和回归的流行方法。决策树被广泛使用,因为它们易于解释,处理分类特征,扩展到多类分类设置,不需要特征缩放以及能够捕获非线性和特征交互。决策树分类算法(例如随机森林和boosting)在分类和回归任务中表现最佳。 spark.ml实现支持使用连续和分类功能的二进制和多类分类以及用于回归的决策树。该实现按行对数据进行分区,从而可以对数百万甚至数十亿个实例进行分布式训练。 用户可以在MLlib决策树指南中找到有关决策树算法的更多信息。此API与原始MLlib决策树API之间的主要区别是: 支持ML管道; 分类与回归的决策树分离; 使用DataFrame元数据来区分连续和分类特征。 决策树的Piplines API比原始API提供了更多功能,特别是,对于分类,用户可以获得每个类别的预测概率(又称类别条件概率);对于回归,用户可以获得预测的有偏样本方差。

使用pyspark.ml.classification内的DecisionTreeClassifier进行决策树的计算,使用pyspark.ml.evaluation内的MulticlassClassificationEvaluator评价决策树的预测效果。

 from pyspark.ml import Pipeline
 from pyspark.ml.classification import DecisionTreeClassifier
 from pyspark.ml.feature import StringIndexer, VectorIndexer
 from pyspark.ml.evaluation import MulticlassClassificationEvaluator

准备数据,设置将“label”转换为“indexedLabel”、“features”转换为“indexedFeatures”的模型方法,并将数据划分为训练集和测试集:

 # Load the data stored in LIBSVM format as a DataFrame.
 data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

 # Index labels, adding metadata to the label column.
 # Fit on whole dataset to include all labels in index.
 labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
 # Automatically identify categorical features, and index them.
 # We specify maxCategories so features with > 4 distinct values are treated as continuous.
 featureIndexer =\
     VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

 # Split the data into training and test sets (30% held out for testing)
 (trainingData, testData) = data.randomSplit([0.7, 0.3])

pyspark.ml.classification中调用DecisionTreeClassifierDecisionTreeClassifier的默认形式为:

 pyspark.ml.classification.DecisionTreeClassifier(featuresCol='features', labelCol='label', predictionCol='prediction', probabilityCol='probability', rawPredictionCol='rawPrediction', maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity='gini', seed=None, weightCol=None, leafCol='', minWeightFractionPerNode=0.0)

上述参量内容为DecisionTreeClassifier内可以设置参量的默认值,也可通过规定某参量的取值对其进行修改。

 # Train a DecisionTree model.
 dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

调用Pipeline生成一个简单的管道,该流水线由一系列阶段组成,每个阶段都是Estimator或Transformer。 调用Pipeline.fit()时,将按顺序执行阶段。 如果阶段是Estimator,则将在输入数据集上调用其Estimator.fit()方法以拟合模型,然后,将使用作为Transformer的模型来转换数据集,作为下一阶段的输入。 如果某个阶段是Transformer,则将调用其Transformer.transform()方法以生成下一个阶段的数据集。 Pipeline的拟合模型是PipelineModel,该模型由与Pipeline阶段相对应的拟合模型和Transformer组成。 如果stages是一个空列表,则Pipeline充当标识转换器。

 # Chain indexers and tree in a Pipeline
 pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

 # Train model.  This also runs the indexers.
 model = pipeline.fit(trainingData)

得到训练数据的拟合模型后可用将其用于对测试数据的预测,并对部分预测结果进行展示:

 # Make predictions.
 predictions = model.transform(testData)

 # Select example rows to display.
 predictions.select("prediction", "indexedLabel", "features").show(5)

pyspark.ml.evaluation中调用MulticlassClassificationEvaluator计算拟合模型预测的准确率,MulticlassClassificationEvaluator的默认形式为:

 pyspark.ml.evaluation.MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1', weightCol=None, metricLabel=0.0, beta=1.0, probabilityCol='probability', eps=1e-15)

上述参量内容为MulticlassClassificationEvaluator内可以设置参量的默认值,也可通过规定某参量的取值对其进行修改。

 # Select (prediction, true label) and compute test error
 evaluator = MulticlassClassificationEvaluator(
     labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
 accuracy = evaluator.evaluate(predictions)
 print("Test Error = %g " % (1.0 - accuracy))

 treeModel = model.stages[2]
 # summary only
 print(treeModel)

12.5.3 聚类

MLlib中可以实现的聚类算法包括:K均值聚类、潜在狄利克雷分配(LDA)、均分k均值聚类、高斯混合模型(GMM)、功率迭代群集(PIC)。 以K均值聚类作为操作实例进行展示。进行K均值聚类需要使用pyspark.ml.clustering中的KMeans,并使用pyspark.ml.evaluation内的MulticlassClassificationEvaluator评价K均值聚类的聚类效果。

 from pyspark.ml.clustering import KMeans
 from pyspark.ml.evaluation import ClusteringEvaluator

 # Loads data.
 dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

直接调用KMeans进行K均值聚类,KMeans的默认形式为:

 pyspark.ml.clustering.KMeans(featuresCol='features', predictionCol='prediction', k=2, initMode='k-means||', initSteps=2, tol=0.0001, maxIter=20, seed=None, distanceMeasure='euclidean', weightCol=None)

上述参量内容为KMeans内可以设置参量的默认值,也可通过规定某参量的取值对其进行修改。 K均值聚类过程中可使用函数setK指定聚类类别数K,也可使用函数setSeed设置随机数种子seed。

 # Trains a k-means model.
 kmeans = KMeans().setK(2).setSeed(1)
 model = kmeans.fit(dataset)

 # Make predictions
 predictions = model.transform(dataset)

调用ClusteringEvaluator查看K均值聚类模型聚类效果:

 # Evaluate clustering by computing Silhouette score
 evaluator = ClusteringEvaluator()

 silhouette = evaluator.evaluate(predictions)
 print("Silhouette with squared euclidean distance = " + str(silhouette))

对K均值聚类,可以打印并看类中心:

 # Shows the result.
 centers = model.clusterCenters()
 print("Cluster Centers: ")
 for center in centers:
     print(center)

12.6 使用交叉验证进行模型选择

MLlib中内置的交叉验证和其他工具使用户可以优化算法和管道中的超参数。

12.6.1 模型选择

ML中的一项重要任务是模型选择,或使用数据为给定任务找到最佳模型或参数,这也称为调整。可以针对单个估算器(例如LogisticRegression)进行调整,也可以针对包括多个算法,功能化和其他步骤的整个管道进行调整。用户可以一次调整整个管道,而不必分别调整管道中的每个元素。 MLlib支持使用CrossValidator和TrainValidationSplit等工具进行模型选择。这些工具需要以下各项:

  • Estimator:要调整的算法或管道
  • 一组ParamMaps:可供选择的参数,有时也称为“参数网格”以进行搜索
  • Evaluator:衡量拟合模型对保留的测试数据的良好程度的度量

在较高级别,这些模型选择工具的工作方式如下: 他们将输入数据分为单独的训练和测试数据集。 对于每个(训练,测试)对,它们都会遍历一组ParamMap:对于每个ParamMap,他们使用这些参数拟合Estimator,获得拟合的Model,然后使用Evaluator评估Model的性能。最终选择由性能最佳的参数集生成的模型。 Evaluator可以是用于回归问题的RegressionEvaluator,用于二进制数据的BinaryClassificationEvaluator,用于多类问题的MulticlassClassificationEvaluator,用于多标签分类的MultilabelClassificationEvaluator或用于对问题进行排名的RankEvaluator。每个评估器中的setMetricName方法都可以覆盖用于选择最佳ParamMap的默认度量。 为了帮助构造参数网格,用户可以使用ParamGridBuilder实用程序。默认情况下,来自参数网格的参数集是串行评估的。在使用CrossValidator或TrainValidationSplit运行模型选择之前,可以通过将并行度设置为2或更大(值1为串行)来并行执行参数评估。应当仔细选择并行度的值,以在不超出群集资源的情况下最大程度地提高并行度,并且较大的值可能并不总是可以提高性能。一般来说,对于大多数群集,最多10个值就足够了。

12.6.2 交叉验证

进行交叉验证需要用到pyspark.ml.tuning中的CrossValidatorParamGridBuilderCrossValidator首先将数据集分成k折,这些分成k折的数据被用作单独的训练和测试数据集。例如,将数据集分成k=3折后,CrossValidator将生成3个(训练,测试)数据集对,每个对都使用2/3的数据进行训练,并使用1/3的数据进行测试。为了评估特定的ParamMap,CrossValidator通过将Estimator拟合到3个不同的(训练,测试)数据集对上,计算出3个模型的平均评估指标。在确定最佳的ParamMap之后,CrossValidator最终使用最佳的ParamMap和整个数据集重新拟合Estimator。

下面将通过具体示例演示如何使用CrossValidator从参数网格中进行选择。 要注意的是,在参数网格上进行交叉验证的成本很高。例如,在下面的示例中,参数网格中hashingTF.numFeatures有3个取值,lr.regParam有2个取值,CrossValidator使用2折。这意味着一共需要训练\((3×2)×2=12\)个不同的模型。在实际设置中,往往会尝试更多参数并使用更多折(k=3和k=10是常见的)。换句话说,使用CrossValidator可能非常昂贵。但是,这也是一种公认的用于选择参数的方法,该方法在统计上比手动调整更合理。

导入所需程序包并准备所需数据:

 from pyspark.ml.classification import LogisticRegression
 from pyspark.ml.evaluation import BinaryClassificationEvaluator
 from pyspark.ml.feature import HashingTF, Tokenizer
 from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

 # Prepare training documents, which are labeled.
 training = spark.createDataFrame([
     (0, "a b c d e spark", 1.0),
     (1, "b d", 0.0),
     (2, "spark f g h", 1.0),
     (3, "hadoop mapreduce", 0.0),
     (4, "b spark who", 1.0),
     (5, "g d a y", 0.0),
     (6, "spark fly", 1.0),
     (7, "was mapreduce", 0.0),
     (8, "e spark program", 1.0),
     (9, "a e c l", 0.0),
     (10, "spark compile", 1.0),
     (11, "hadoop software", 0.0)
 ], ["id", "text", "label"])

将文本数据转化为可用的数值向量,并进行逻辑回归:

 # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
 tokenizer = Tokenizer(inputCol="text", outputCol="words")
 hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
 lr = LogisticRegression(maxIter=10)
 pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

设置参数网格的参数取值:

 # We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
 # This will allow us to jointly choose parameters for all Pipeline stages.
 # A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
 # We use a ParamGridBuilder to construct a grid of parameters to search over.
 # With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
 # this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
 paramGrid = ParamGridBuilder() \
     .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
     .addGrid(lr.regParam, [0.1, 0.01]) \
     .build()

设置验证数据折数并进行交叉验证选取最优参数:

 crossval = CrossValidator(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=BinaryClassificationEvaluator(),
                           numFolds=2)  # use 3+ folds in practice

 # Run cross-validation, and choose the best set of parameters.
 cvModel = crossval.fit(training)

使用测试数据测试最终选取模型的优劣:

 # Prepare test documents, which are unlabeled.
 test = spark.createDataFrame([
     (4, "spark i j k"),
     (5, "l m n"),
     (6, "mapreduce spark"),
     (7, "apache hadoop")
 ], ["id", "text"])

 # Make predictions on test documents. cvModel uses the best model found (lrModel).
 prediction = cvModel.transform(test)
 selected = prediction.select("id", "text", "probability", "prediction")
 for row in selected.collect():
     print(row)