{ "cells": [ { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "# Scala for Spark\n", "\n", "## Feng Li\n", "\n", "### Central University of Finance and Economics\n", "\n", "### [feng.li@cufe.edu.cn](feng.li@cufe.edu.cn)\n", "### Course home page: [https://feng.li/distcomp](https://feng.li/distcomp)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Matrix Operations\n", "\n", "- Spark `MLlib` supports local vectors and matrices stored on a single machine, as well as distributed matrices backed by one or more RDDs. \n", "\n", "- Local vectors and local matrices are simple data models that serve as public interfaces. The underlying linear algebra operations are provided by Scala library `Breeze`." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "import breeze.linalg._" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "data": { "text/plain": [ "x = DenseVector(0.0, 0.0, 0.0, 0.0, 0.0)\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "DenseVector(0.0, 0.0, 0.0, 0.0, 0.0)" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "val x = DenseVector.zeros[Double](5)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Spark Matrix operations with Scala" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "data": { "text/plain": [ "dv = [1.0,0.0,3.0]\n", "sv1 = (3,[0,2],[1.0,3.0])\n", "sv2 = (3,[0,2],[1.0,3.0])\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "(3,[0,2],[1.0,3.0])" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import org.apache.spark.ml.linalg.{Vector, Vectors}\n", "\n", "// Create a dense vector (1.0, 0.0, 3.0).\n", "val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)\n", "// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.\n", "val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))\n", "// Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries.\n", "val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "data": { "text/plain": [ "pos = (1.0,[1.0,0.0,3.0])\n", "neg = (0.0,(3,[0,2],[1.0,3.0]))\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "(0.0,(3,[0,2],[1.0,3.0]))" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import org.apache.spark.mllib.linalg.Vectors\n", "import org.apache.spark.mllib.regression.LabeledPoint\n", "\n", "// Create a labeled point with a positive label and a dense feature vector.\n", "val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))\n", "\n", "// Create a labeled point with a negative label and a sparse feature vector.\n", "val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "data": { "text/plain": [ "dm = \n", "sm = \n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "1.0 2.0\n", "3.0 4.0\n", "5.0 6.0\n", "3 x 2 CSCMatrix\n", "(0,0) 9.0\n", "(2,1) 6.0\n", "(1,1) 8.0\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "import org.apache.spark.ml.linalg.{Matrix, Matrices}\n", "\n", "// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))\n", "val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))\n", "\n", "// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))\n", "val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Distributed matrix\n", "\n", "- A distributed matrix has long-typed row and column indices and double-typed values, stored distributively in one or more RDDs. \n", "\n", "- It is very important to choose the right format to store large and distributed matrices. Converting a distributed matrix to a different format may require a global shuffle, which is quite expensive. \n", "\n", "- Four types of distributed matrices have been implemented so far." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "import org.apache.spark.mllib.linalg.Vector\n", "import org.apache.spark.mllib.linalg.distributed.RowMatrix\n", "\n", "val rows: RDD[Vector] = ... // an RDD of local vectors\n", "// Create a RowMatrix from an RDD[Vector].\n", "val mat: RowMatrix = new RowMatrix(rows)\n", "\n", "// Get its size.\n", "val m = mat.numRows()\n", "val n = mat.numCols()\n", "\n", "// QR decomposition \n", "val qrResult = mat.tallSkinnyQR(true)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}\n", "\n", "val rows: RDD[IndexedRow] = ... // an RDD of indexed rows\n", "// Create an IndexedRowMatrix from an RDD[IndexedRow].\n", "val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)\n", "\n", "// Get its size.\n", "val m = mat.numRows()\n", "val n = mat.numCols()\n", "\n", "// Drop its row indices.\n", "val rowMat: RowMatrix = mat.toRowMatrix()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}\n", "\n", "val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries\n", "// Create a CoordinateMatrix from an RDD[MatrixEntry].\n", "val mat: CoordinateMatrix = new CoordinateMatrix(entries)\n", "\n", "// Get its size.\n", "val m = mat.numRows()\n", "val n = mat.numCols()\n", "\n", "// Convert it to an IndexRowMatrix whose rows are sparse vectors.\n", "val indexedRowMatrix = mat.toIndexedRowMatrix()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}\n", "\n", "val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries\n", "// Create a CoordinateMatrix from an RDD[MatrixEntry].\n", "val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)\n", "// Transform the CoordinateMatrix to a BlockMatrix\n", "val matA: BlockMatrix = coordMat.toBlockMatrix().cache()\n", "\n", "// Validate whether the BlockMatrix is set up properly. 