Scala for Spark¶

Feng Li¶

Central University of Finance and Economics¶

feng.li@cufe.edu.cn¶

Course home page: https://feng.li/distcomp¶

Matrix Operations¶

  • Spark MLlib supports local vectors and matrices stored on a single machine, as well as distributed matrices backed by one or more RDDs.

  • Local vectors and local matrices are simple data models that serve as public interfaces. The underlying linear algebra operations are provided by Scala library Breeze.

In [1]:
import breeze.linalg._
In [2]:
val x = DenseVector.zeros[Double](5)
x = DenseVector(0.0, 0.0, 0.0, 0.0, 0.0)
Out[2]:
DenseVector(0.0, 0.0, 0.0, 0.0, 0.0)

Spark Matrix operations with Scala¶

In [4]:
import org.apache.spark.ml.linalg.{Vector, Vectors}

// Create a dense vector (1.0, 0.0, 3.0).
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.
val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries.
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
dv = [1.0,0.0,3.0]
sv1 = (3,[0,2],[1.0,3.0])
sv2 = (3,[0,2],[1.0,3.0])
Out[4]:
(3,[0,2],[1.0,3.0])
In [6]:
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

// Create a labeled point with a positive label and a dense feature vector.
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))

// Create a labeled point with a negative label and a sparse feature vector.
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
pos = (1.0,[1.0,0.0,3.0])
neg = (0.0,(3,[0,2],[1.0,3.0]))
Out[6]:
(0.0,(3,[0,2],[1.0,3.0]))
In [7]:
import org.apache.spark.ml.linalg.{Matrix, Matrices}

// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))

// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))
dm = 
sm = 
1.0  2.0
3.0  4.0
5.0  6.0
3 x 2 CSCMatrix
(0,0) 9.0
(2,1) 6.0
(1,1) 8.0

Distributed matrix¶

  • A distributed matrix has long-typed row and column indices and double-typed values, stored distributively in one or more RDDs.

  • It is very important to choose the right format to store large and distributed matrices. Converting a distributed matrix to a different format may require a global shuffle, which is quite expensive.

  • Four types of distributed matrices have been implemented so far.

In [ ]:
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.distributed.RowMatrix

val rows: RDD[Vector] = ... // an RDD of local vectors
// Create a RowMatrix from an RDD[Vector].
val mat: RowMatrix = new RowMatrix(rows)

// Get its size.
val m = mat.numRows()
val n = mat.numCols()

// QR decomposition 
val qrResult = mat.tallSkinnyQR(true)
In [ ]:
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}

val rows: RDD[IndexedRow] = ... // an RDD of indexed rows
// Create an IndexedRowMatrix from an RDD[IndexedRow].
val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)

// Get its size.
val m = mat.numRows()
val n = mat.numCols()

// Drop its row indices.
val rowMat: RowMatrix = mat.toRowMatrix()
In [ ]:
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val mat: CoordinateMatrix = new CoordinateMatrix(entries)

// Get its size.
val m = mat.numRows()
val n = mat.numCols()

// Convert it to an IndexRowMatrix whose rows are sparse vectors.
val indexedRowMatrix = mat.toIndexedRowMatrix()
In [ ]:
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}

val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
// Transform the CoordinateMatrix to a BlockMatrix
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()

// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Nothing happens if it is valid.
matA.validate()

// Calculate A^T A.
val ata = matA.transpose.multiply(matA)