{ "cells": [ { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "# Machine Learning with Spark\n", "\n", "## Feng Li\n", "\n", "### Central University of Finance and Economics\n", "\n", "### [feng.li@cufe.edu.cn](feng.li@cufe.edu.cn)\n", "### Course home page: [https://feng.li/distcomp](https://feng.li/distcomp)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Machine Learning Library\n", "\n", "MLlib is Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy. At a high level, it provides tools such as:\n", "\n", "- ML Algorithms: common learning algorithms such as classification, regression, clustering, and collaborative filtering\n", "- Featurization: feature extraction, transformation, dimensionality reduction, and selection\n", "- Pipelines: tools for constructing, evaluating, and tuning ML Pipelines\n", "- Persistence: saving and load algorithms, models, and Pipelines\n", "- Utilities: linear algebra, statistics, data handling, etc." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## MLlib APIs\n", "\n", "\n", "- The RDD-based APIs in the `spark.mllib` package have entered maintenance mode. \n", "\n", "- The primary Machine Learning API for Spark is now the DataFrame-based API in the `spark.ml` package.\n", "\n", "- Why the DataFrame-based API?\n", "\n", " - DataFrames provide a more user-friendly API than RDDs. The many benefits of DataFrames include Spark Datasources, SQL/DataFrame queries, Tungsten and Catalyst optimizations, and uniform APIs across languages.\n", " \n", " - The DataFrame-based API for MLlib provides a uniform API across ML algorithms and across multiple languages.\n", " \n", " - DataFrames facilitate practical ML Pipelines, particularly feature transformations. See the Pipelines guide for details." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Start a Spark Session " ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "data": { "text/html": [ "\n", "
\n", "

SparkSession - in-memory

\n", " \n", "
\n", "

SparkContext

\n", "\n", "

Spark UI

\n", "\n", "
\n", "
Version
\n", "
v2.4.4
\n", "
Master
\n", "
local[*]
\n", "
AppName
\n", "
Python Spark with ML
\n", "
\n", "
\n", " \n", "
\n", " " ], "text/plain": [ "" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import findspark\n", "findspark.init('/usr/lib/spark-current')\n", "from pyspark.sql import SparkSession\n", "spark = SparkSession.builder.appName(\"Python Spark with ML\").getOrCreate()\n", "spark" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Correlation\n", "\n", "- Calculating the correlation between two series of data is a common operation in Statistics. \n", "\n", "- The `spark.ml` provides the flexibility to calculate pairwise correlations among many series. The supported correlation methods are currently Pearson’s and Spearman’s correlation." ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Pearson correlation matrix:\n", "DenseMatrix([[1. , 0.05564149, nan, 0.40047142],\n", " [0.05564149, 1. , nan, 0.91359586],\n", " [ nan, nan, 1. , nan],\n", " [0.40047142, 0.91359586, nan, 1. ]])\n", "Spearman correlation matrix:\n", "DenseMatrix([[1. , 0.10540926, nan, 0.4 ],\n", " [0.10540926, 1. , nan, 0.9486833 ],\n", " [ nan, nan, 1. , nan],\n", " [0.4 , 0.9486833 , nan, 1. ]])\n" ] } ], "source": [ "from pyspark.ml.linalg import Vectors\n", "from pyspark.ml.stat import Correlation\n", "\n", "data = [(Vectors.sparse(4, [(0, 1.0), (3, -2.0)]),),\n", " (Vectors.dense([4.0, 5.0, 0.0, 3.0]),),\n", " (Vectors.dense([6.0, 7.0, 0.0, 8.0]),),\n", " (Vectors.sparse(4, [(0, 9.0), (3, 1.0)]),)]\n", "df = spark.createDataFrame(data, [\"features\"])\n", "\n", "r1 = Correlation.corr(df, \"features\").head()\n", "print(\"Pearson correlation matrix:\\n\" + str(r1[0]))\n", "\n", "r2 = Correlation.corr(df, \"features\", \"spearman\").head()\n", "print(\"Spearman correlation matrix:\\n\" + str(r2[0]))" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Summarizer\n", "\n", "- The `spark.ml` provides vector column summary statistics for Dataframe through Summarizer. \n", "\n", "- Available metrics are the column-wise `max`, `min`, `mean`, `variance`, and number of `nonzeros`, as well as the total `count`." ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "sc = spark.sparkContext # make a spakr context for RDD " ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----------------------------------+\n", "|aggregate_metrics(features, weight)|\n", "+-----------------------------------+\n", "|[[1.0,1.0,1.0], 1] |\n", "+-----------------------------------+\n", "\n", "+--------------------------------+\n", "|aggregate_metrics(features, 1.0)|\n", "+--------------------------------+\n", "|[[1.0,1.5,2.0], 2] |\n", "+--------------------------------+\n", "\n", "+--------------+\n", "|mean(features)|\n", "+--------------+\n", "|[1.0,1.0,1.0] |\n", "+--------------+\n", "\n", "+--------------+\n", "|mean(features)|\n", "+--------------+\n", "|[1.0,1.5,2.0] |\n", "+--------------+\n", "\n" ] } ], "source": [ "from pyspark.ml.stat import Summarizer\n", "from pyspark.sql import Row\n", "from pyspark.ml.linalg import Vectors\n", "\n", "df = sc.parallelize([Row(weight=1.0, features=Vectors.dense(1.0, 1.0, 1.0)),\n", " Row(weight=0.0, features=Vectors.dense(1.0, 2.0, 3.0))]).toDF()\n", "\n", "# create summarizer for multiple metrics \"mean\" and \"count\"\n", "summarizer = Summarizer.metrics(\"mean\", \"count\")\n", "\n", "# compute statistics for multiple metrics with weight\n", "df.select(summarizer.summary(df.features, df.weight)).show(truncate=False)\n", "\n", "# compute statistics for multiple metrics without weight\n", "df.select(summarizer.summary(df.features)).show(truncate=False)\n", "\n", "# compute statistics for single metric \"mean\" with weight\n", "df.select(Summarizer.mean(df.features, df.weight)).show(truncate=False)\n", "\n", "# compute statistics for single metric \"mean\" without weight\n", "df.select(Summarizer.mean(df.features)).show(truncate=False)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Machine Learning Pipelines\n", "\n", "- MLlib standardizes APIs for machine learning algorithms to make it easier to combine multiple algorithms into a single pipeline, or workflow. \n", "\n", "- The pipeline concept is mostly inspired by the `scikit-learn` project.\n", "\n", " - `DataFrame`: This ML API uses DataFrame from Spark SQL as an ML dataset, which can hold a variety of data types. E.g., a DataFrame could have different columns storing text, feature vectors, true labels, and predictions.\n", "\n", " - `Transformer`: A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions.\n", "\n", " - `Estimator`: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.\n", "\n", " - `Pipeline`: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.\n", "\n", " - `Parameter`: All Transformers and Estimators now share a common API for specifying parameters." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "![Pipline](./figures/ml-Pipeline.png)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Pipline example: Logistic Regression" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "from pyspark.ml.linalg import Vectors\n", "from pyspark.ml.classification import LogisticRegression\n", "\n", "# Prepare training data from a list of (label, features) tuples.\n", "training = spark.createDataFrame([\n", " (1.0, Vectors.dense([0.0, 1.1, 0.1])),\n", " (0.0, Vectors.dense([2.0, 1.0, -1.0])),\n", " (0.0, Vectors.dense([2.0, 1.3, 1.0])),\n", " (1.0, Vectors.dense([0.0, 1.2, -0.5]))], [\"label\", \"features\"])\n", "\n", "\n", "# Prepare test data\n", "test = spark.createDataFrame([\n", " (1.0, Vectors.dense([-1.0, 1.5, 1.3])),\n", " (0.0, Vectors.dense([3.0, 2.0, -0.1])),\n", " (1.0, Vectors.dense([0.0, 2.2, -1.5]))], [\"label\", \"features\"])" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "LogisticRegression parameters:\n", "aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)\n", "elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)\n", "family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)\n", "featuresCol: features column name. (default: features)\n", "fitIntercept: whether to fit an intercept term. (default: True)\n", "labelCol: label column name. (default: label)\n", "lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)\n", "lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. The bounds vector size must beequal with 1 for binomial regression, or the number oflasses for multinomial regression. (undefined)\n", "maxIter: max number of iterations (>= 0). (default: 100, current: 10)\n", "predictionCol: prediction column name. (default: prediction)\n", "probabilityCol: Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities. (default: probability)\n", "rawPredictionCol: raw prediction (a.k.a. confidence) column name. (default: rawPrediction)\n", "regParam: regularization parameter (>= 0). (default: 0.0, current: 0.01)\n", "standardization: whether to standardize the training features before fitting the model. (default: True)\n", "threshold: Threshold in binary classification prediction, in range [0, 1]. If threshold and thresholds are both set, they must match.e.g. if threshold is p, then thresholds must be equal to [1-p, p]. (default: 0.5)\n", "thresholds: Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values > 0, excepting that at most one value may be 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class's threshold. (undefined)\n", "tol: the convergence tolerance for iterative algorithms (>= 0). (default: 1e-06)\n", "upperBoundsOnCoefficients: The upper bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)\n", "upperBoundsOnIntercepts: The upper bounds on intercepts if fitting under bound constrained optimization. The bound vector size must be equal with 1 for binomial regression, or the number of classes for multinomial regression. (undefined)\n", "weightCol: weight column name. If this is not set or empty, we treat all instance weights as 1.0. (undefined)\n", "\n" ] } ], "source": [ "# Create a LogisticRegression instance. This instance is an Estimator.\n", "lr = LogisticRegression(maxIter=10, regParam=0.01)\n", "# Print out the parameters, documentation, and any default values.\n", "print(\"LogisticRegression parameters:\\n\" + lr.explainParams() + \"\\n\")" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Model 1 was fit using parameters: \n", "{Param(parent='LogisticRegression_1f95b4862951', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2, Param(parent='LogisticRegression_1f95b4862951', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0, Param(parent='LogisticRegression_1f95b4862951', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto', Param(parent='LogisticRegression_1f95b4862951', name='featuresCol', doc='features column name'): 'features', Param(parent='LogisticRegression_1f95b4862951', name='fitIntercept', doc='whether to fit an intercept term'): True, Param(parent='LogisticRegression_1f95b4862951', name='labelCol', doc='label column name'): 'label', Param(parent='LogisticRegression_1f95b4862951', name='maxIter', doc='maximum number of iterations (>= 0)'): 10, Param(parent='LogisticRegression_1f95b4862951', name='predictionCol', doc='prediction column name'): 'prediction', Param(parent='LogisticRegression_1f95b4862951', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability', Param(parent='LogisticRegression_1f95b4862951', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction', Param(parent='LogisticRegression_1f95b4862951', name='regParam', doc='regularization parameter (>= 0)'): 0.01, Param(parent='LogisticRegression_1f95b4862951', name='standardization', doc='whether to standardize the training features before fitting the model'): True, Param(parent='LogisticRegression_1f95b4862951', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5, Param(parent='LogisticRegression_1f95b4862951', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}\n" ] } ], "source": [ "# Learn a LogisticRegression model. This uses the parameters stored in lr.\n", "model1 = lr.fit(training)\n", "\n", "# Since model1 is a Model (i.e., a transformer produced by an Estimator),\n", "# we can view the parameters it used during fit().\n", "# This prints the parameter (name: value) pairs, where names are unique IDs for this\n", "# LogisticRegression instance.\n", "print(\"Model 1 was fit using parameters: \")\n", "print(model1.extractParamMap())" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Model 2 was fit using parameters: \n", "{Param(parent='LogisticRegression_1f95b4862951', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2, Param(parent='LogisticRegression_1f95b4862951', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0, Param(parent='LogisticRegression_1f95b4862951', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto', Param(parent='LogisticRegression_1f95b4862951', name='featuresCol', doc='features column name'): 'features', Param(parent='LogisticRegression_1f95b4862951', name='fitIntercept', doc='whether to fit an intercept term'): True, Param(parent='LogisticRegression_1f95b4862951', name='labelCol', doc='label column name'): 'label', Param(parent='LogisticRegression_1f95b4862951', name='maxIter', doc='maximum number of iterations (>= 0)'): 30, Param(parent='LogisticRegression_1f95b4862951', name='predictionCol', doc='prediction column name'): 'prediction', Param(parent='LogisticRegression_1f95b4862951', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'myProbability', Param(parent='LogisticRegression_1f95b4862951', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction', Param(parent='LogisticRegression_1f95b4862951', name='regParam', doc='regularization parameter (>= 0)'): 0.1, Param(parent='LogisticRegression_1f95b4862951', name='standardization', doc='whether to standardize the training features before fitting the model'): True, Param(parent='LogisticRegression_1f95b4862951', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.55, Param(parent='LogisticRegression_1f95b4862951', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}\n" ] } ], "source": [ "# We may alternatively specify parameters using a Python dictionary as a paramMap\n", "paramMap = {lr.maxIter: 20}\n", "paramMap[lr.maxIter] = 30 # Specify 1 Param, overwriting the original maxIter.\n", "paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55}) # Specify multiple Params.\n", "\n", "# You can combine paramMaps, which are python dictionaries.\n", "paramMap2 = {lr.probabilityCol: \"myProbability\"} # Change output column name\n", "paramMapCombined = paramMap.copy()\n", "paramMapCombined.update(paramMap2)\n", "\n", "# Now learn a new model using the paramMapCombined parameters.\n", "# paramMapCombined overrides all parameters set earlier via lr.set* methods.\n", "model2 = lr.fit(training, paramMapCombined)\n", "print(\"Model 2 was fit using parameters: \")\n", "print(model2.extractParamMap())" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "features=[-1.0,1.5,1.3], label=1.0 -> prob=[0.057073041710340604,0.9429269582896593], prediction=1.0\n", "features=[3.0,2.0,-0.1], label=0.0 -> prob=[0.9238522311704118,0.07614776882958824], prediction=0.0\n", "features=[0.0,2.2,-1.5], label=1.0 -> prob=[0.10972776114779727,0.8902722388522027], prediction=1.0\n" ] } ], "source": [ "# Make predictions on test data using the Transformer.transform() method.\n", "# LogisticRegression.transform will only use the 'features' column.\n", "# Note that model2.transform() outputs a \"myProbability\" column instead of the usual\n", "# 'probability' column since we renamed the lr.probabilityCol parameter previously.\n", "prediction = model2.transform(test)\n", "result = prediction.select(\"features\", \"label\", \"myProbability\", \"prediction\") \\\n", " .collect()\n", "\n", "for row in result:\n", " print(\"features=%s, label=%s -> prob=%s, prediction=%s\"\n", " % (row.features, row.label, row.myProbability, row.prediction))" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Pipline example: Decision tree classifier " ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "from pyspark.ml import Pipeline\n", "from pyspark.ml.classification import DecisionTreeClassifier\n", "from pyspark.ml.feature import StringIndexer, VectorIndexer\n", "from pyspark.ml.evaluation import MulticlassClassificationEvaluator\n", "\n", "# Load the data stored in LIBSVM format as a DataFrame.\n", "data = spark.read.format(\"libsvm\").load(\"/data/sample_libsvm_data.txt\")\n", "\n", "# Index labels, adding metadata to the label column.\n", "# Fit on whole dataset to include all labels in index.\n", "labelIndexer = StringIndexer(inputCol=\"label\", outputCol=\"indexedLabel\").fit(data)\n", "# Automatically identify categorical features, and index them.\n", "# We specify maxCategories so features with > 4 distinct values are treated as continuous.\n", "featureIndexer =\\\n", " VectorIndexer(inputCol=\"features\", outputCol=\"indexedFeatures\", maxCategories=4).fit(data)\n", "\n", "# Split the data into training and test sets (30% held out for testing)\n", "(trainingData, testData) = data.randomSplit([0.7, 0.3])" ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "# Train a DecisionTree model.\n", "dt = DecisionTreeClassifier(labelCol=\"indexedLabel\", featuresCol=\"indexedFeatures\")\n", "\n", "# Chain indexers and tree in a Pipeline\n", "pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])\n", "\n", "# Train model. This also runs the indexers.\n", "model = pipeline.fit(trainingData)" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+------------+--------------------+\n", "|prediction|indexedLabel| features|\n", "+----------+------------+--------------------+\n", "| 1.0| 1.0|(692,[100,101,102...|\n", "| 1.0| 1.0|(692,[122,123,124...|\n", "| 1.0| 1.0|(692,[122,123,148...|\n", "| 1.0| 1.0|(692,[123,124,125...|\n", "| 1.0| 1.0|(692,[124,125,126...|\n", "+----------+------------+--------------------+\n", "only showing top 5 rows\n", "\n", "Test Error = 0.0285714 \n", "DecisionTreeClassificationModel (uid=DecisionTreeClassifier_19583263959e) of depth 1 with 3 nodes\n" ] } ], "source": [ "# Make predictions.\n", "predictions = model.transform(testData)\n", "\n", "# Select example rows to display.\n", "predictions.select(\"prediction\", \"indexedLabel\", \"features\").show(5)\n", "\n", "# Select (prediction, true label) and compute test error\n", "evaluator = MulticlassClassificationEvaluator(\n", " labelCol=\"indexedLabel\", predictionCol=\"prediction\", metricName=\"accuracy\")\n", "accuracy = evaluator.evaluate(predictions)\n", "print(\"Test Error = %g \" % (1.0 - accuracy))\n", "\n", "treeModel = model.stages[2]\n", "# summary only\n", "print(treeModel)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Pipline example: Clustering" ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "scrolled": false, "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "from pyspark.ml.clustering import KMeans\n", "from pyspark.ml.evaluation import ClusteringEvaluator\n", "\n", "# Loads data.\n", "dataset = spark.read.format(\"libsvm\").load(\"/data/sample_kmeans_data.txt\")\n" ] }, { "cell_type": "code", "execution_count": 20, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "# Trains a k-means model.\n", "kmeans = KMeans().setK(2).setSeed(1)\n", "model = kmeans.fit(dataset)\n", "\n", "# Make predictions\n", "predictions = model.transform(dataset)" ] }, { "cell_type": "code", "execution_count": 21, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Silhouette with squared euclidean distance = 0.9997530305375207\n" ] } ], "source": [ "# Evaluate clustering by computing Silhouette score\n", "evaluator = ClusteringEvaluator()\n", "\n", "silhouette = evaluator.evaluate(predictions)\n", "print(\"Silhouette with squared euclidean distance = \" + str(silhouette))" ] }, { "cell_type": "code", "execution_count": 28, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Cluster Centers: \n", "[0.1 0.1 0.1]\n", "[9.1 9.1 9.1]\n" ] } ], "source": [ "# Shows the result.\n", "centers = model.clusterCenters()\n", "print(\"Cluster Centers: \")\n", "for center in centers:\n", " print(center)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Model Selection with Cross-Validation" ] }, { "cell_type": "code", "execution_count": 22, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "from pyspark.ml.classification import LogisticRegression\n", "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n", "from pyspark.ml.feature import HashingTF, Tokenizer\n", "from pyspark.ml.tuning import CrossValidator, ParamGridBuilder\n", "\n", "# Prepare training documents, which are labeled.\n", "training = spark.createDataFrame([\n", " (0, \"a b c d e spark\", 1.0),\n", " (1, \"b d\", 0.0),\n", " (2, \"spark f g h\", 1.0),\n", " (3, \"hadoop mapreduce\", 0.0),\n", " (4, \"b spark who\", 1.0),\n", " (5, \"g d a y\", 0.0),\n", " (6, \"spark fly\", 1.0),\n", " (7, \"was mapreduce\", 0.0),\n", " (8, \"e spark program\", 1.0),\n", " (9, \"a e c l\", 0.0),\n", " (10, \"spark compile\", 1.0),\n", " (11, \"hadoop software\", 0.0)\n", "], [\"id\", \"text\", \"label\"])" ] }, { "cell_type": "code", "execution_count": 23, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.\n", "tokenizer = Tokenizer(inputCol=\"text\", outputCol=\"words\")\n", "hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol=\"features\")\n", "lr = LogisticRegression(maxIter=10)\n", "pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])" ] }, { "cell_type": "code", "execution_count": 24, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.\n", "# This will allow us to jointly choose parameters for all Pipeline stages.\n", "# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.\n", "# We use a ParamGridBuilder to construct a grid of parameters to search over.\n", "# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,\n", "# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.\n", "paramGrid = ParamGridBuilder() \\\n", " .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \\\n", " .addGrid(lr.regParam, [0.1, 0.01]) \\\n", " .build()\n", "\n", "crossval = CrossValidator(estimator=pipeline,\n", " estimatorParamMaps=paramGrid,\n", " evaluator=BinaryClassificationEvaluator(),\n", " numFolds=2) # use 3+ folds in practice\n", "\n", "# Run cross-validation, and choose the best set of parameters.\n", "cvModel = crossval.fit(training)" ] }, { "cell_type": "code", "execution_count": 27, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Row(id=4, text='spark i j k', probability=DenseVector([0.2581, 0.7419]), prediction=1.0)\n", "Row(id=5, text='l m n', probability=DenseVector([0.9186, 0.0814]), prediction=0.0)\n", "Row(id=6, text='mapreduce spark', probability=DenseVector([0.432, 0.568]), prediction=1.0)\n", "Row(id=7, text='apache hadoop', probability=DenseVector([0.6766, 0.3234]), prediction=0.0)\n" ] } ], "source": [ "# Prepare test documents, which are unlabeled.\n", "test = spark.createDataFrame([\n", " (4, \"spark i j k\"),\n", " (5, \"l m n\"),\n", " (6, \"mapreduce spark\"),\n", " (7, \"apache hadoop\")\n", "], [\"id\", \"text\"])\n", "\n", "# Make predictions on test documents. cvModel uses the best model found (lrModel).\n", "prediction = cvModel.transform(test)\n", "selected = prediction.select(\"id\", \"text\", \"probability\", \"prediction\")\n", "for row in selected.collect():\n", " print(row)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Lab\n", "\n", "- Run a logistic regression with airdelay data." ] } ], "metadata": { "celltoolbar": "Slideshow", "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.17" } }, "nbformat": 4, "nbformat_minor": 4 }