第 16 章 基于Scala语言的Spark

16.1 Scala与Python对比

Spark用Scala编写,因为它是静态类型的,并且可以以已知的方式编译到JVM,因此它可以非常快地进行编写。 尽管Spark拥有适用于Scala,Python,Java和R的API,但流行的语言是前两种。因为Java不支持Read-Evaluate-Print-Loop(REPL),而R不是通用语言。 目前数据科学界分为两个阵营,一个偏爱Scala,另一个偏爱Python。 Scala与Python都有其优点和缺点,最终的选择应取决于用户应用的需求。 若用户想对spark进行深入建模,开发新的模型,就需要知道它的底层构造,这时学习Scala是非常必要的;而若用户只是想使用spark里既有接口中的模型方法,不想开发新的模型,就可以直接使用python。

下面将从性能、学习曲线、适用性、可用性、代码恢复与安全方面探讨两者的异同。

16.1.1 性能

从性能上说,Scala的运行速度通常比Python快10倍以上。 编译语言比解释语言运行速度更快。Scala是静态类型的,在运行时使用Java虚拟机(JVM),在大多数情况下,它比Python快一些。Python是动态类型的,这会降低速度。 Python使用时需调用Spark库,这需要大量代码处理,因此性能也会降低。 Python通过spark处理海量数据时需要很多的内存以实现cache功能。Scala是静态的,它对变量进行提前声明,在计算过程中节省大量计算空间。在这种情况下,Scala对于有限的内核非常有效。 此外,Scala是基于JVM的Hadoop原生。Hadoop非常重要,因为Spark是在Hadoop的文件系统HDFS之上构建的。Python与Hadoop服务的交互非常差,因此开发人员必须使用第三方库(例如hadoopy)。Scala通过Java中的本机Hadoop API与Hadoop交互。这就是为什么在Scala中编写本机Hadoop应用程序非常容易的原因。

16.1.2 学习曲线

Scala与Python都是功能性和面向对象的语言,除了支持社区活跃外,它们还具有相似的语法。 Python具有简单的语法和良好的标准库。由于Scala中的高级功能,与Python相比,Scala学习起来可能更加复杂。 Python对于简单直观的逻辑更可取,而Scala对于复杂的工作流程则更有用。

16.1.3 适用性

Scala具有多个标准库和核,可以在大数据生态系统中快速集成数据库。 Scala允许编写具有多个并发原语的代码,而Python不支持并发或多线程。由于具有并发功能,Scala可以实现更好的内存管理和数据处理。 但是,Python支持重量级的流程派生。在此,一次仅一个线程处于活动状态。因此,无论何时部署新代码,都必须重新启动进程,这会增加内存开销。

16.1.4 可用性

两者都是富有表现力的,我们可以通过它们达到较高的功能水平。 在框架,库,隐式,宏等方面,Scala更为强大。由于其功能性质,Scala在MapReduce框架中运行良好。许多Scala数据框架遵循与Scala的API集合类似的抽象数据类型。开发人员只需要学习基本的标准集合,就可以轻松地熟悉其他库。Spark是用Scala编写的,因此了解Scala可以让您了解和修改Spark在内部所做的工作。此外,许多即将发布的功能往往首先发布在Scala和Java中的API,而Python API在以后的版本中会不断发展。 Python更加用户友好和简洁。对于NLP,Python是首选,因为Scala没有很多用于机器学习或NLP的工具。此外,要使用GraphX,GraphFrames和MLLib,Python也是首选。Python的可视化库补充了Pyspark,这是Spark和Scala不可比拟的。

16.1.5 代码恢复与安全

Scala是一种静态类型的语言,它使我们能够发现编译时的错误。而Python是一种动态类型化的语言。每当用户对现有代码进行更改时,Python语言极容易出现错误。因此,为Scala重构代码比为Python重构更容易。

16.1.6 结论

Python速度较慢,但非常易于使用,而Scala速度快,且也较为易于使用。 Scala提供对Spark最新功能的访问,因为Apache Spark是用Scala编写的。 Apache Spark编程语言的选择取决于最适合项目需求的功能,因为每个语言各有优缺点。 Python更加面向分析,而Scala更加面向工程,但是两者都是用于构建数据科学应用程序的出色语言。总体而言,为了充分利用Spark的潜力,Scala将更加有益。如果用户只是想通过Spark进行开箱即用的机器学习,那么Python是值得学习的。

16.2 Scala介绍

16.2.1 Scala的背景

Scala是Scalable Language的缩写,是一种混合功能编程语言。它是由瑞士洛桑联邦理工学院(EPFL)编程方法教授马丁·奥德斯基(Martin Odersky)创建的。

马丁·奥德斯基(Martin Odersky)(生于1958年9月5日)是德国计算机科学家,也是瑞士洛桑联邦理工学院(EPFL)的编程方法教授。他专门研究代码分析和编程语言。他与其他人一起设计了Scala编程语言和通用Java(以及之前的Pizza)。他是Generic Java的共同设计者之一,实现了GJ编译器,并且他的实现成为Java编译器javac的基础,因此他也被称为javac编译器的“父亲”。 Martin Odersky曾在尼克劳斯·沃思(Niklaus Wirth)的指导下学习,Nicklaus Wirth是最著名的几种编程语言的设计师,创建了Pascal和其他几种语言。

Scala是面向对象的,并且是一种功能语言,因为每个函数都是一个值,每个值都是一个对象,因此最终每个函数都是一个对象。 Scala在Java平台(Java虚拟机)上运行,并且与现有Java程序兼容。打包时,由于Android应用程序通常是用Java编写并从Java字节码转换为Dalvik字节码(在安装过程中可能会进一步转换为本机代码),因此Scala的Java兼容性使其非常适合Android开发,而在使用功能性方法时更是如此是首选。

Scala的设计始于2001年,在洛桑联邦理工学院(EPFL),2003年内部发布后,Scala于2004年初在Java平台上公开发布。 Scala不是创建“更好的Java”的唯一尝试。诸如Kotlin和Ceylon之类的替代方案也走了这条路,但他们做出了一个基本决定,即在语法上保持与Java语言本身非常接近,以最大程度地减少学习时间。这似乎是个好主意,但最终还是有些自欺欺人,因为它迫使用户停留在许多完全相同的Java范式之内,而Java范式正是首先要创建“更好的Java”的原因。 相比之下,Scala的创建目的是成为一种更好的语言,以摆脱Java在过于繁琐或令人沮丧的方面对开发人员的限制。这样开发的结果是Scala的学习曲线陡峭,使一些公司不去使用它。但是,Twitter,Apple,Linkedin,Airbnb或UBS等公司都在使用它。

16.2.2 Scala概述

一些有关Scala的重要知识:

  • Scala是一种高级语言
  • Scala是静态输入
  • Scala语言具有表达性,其语法简洁但仍然可读
  • Scala支持面向对象编程(OOP)范例-每个变量都是一个对象,每个“运算符”都是一个方法。
  • Scala支持函数编程(FP)范例-函数也是变量,用户可以将它们传递给其他函数。用户可以使用OOP,FP编写代码,也可以将它们组合成混合样式。
  • Scala具有完善的类型推断系统
  • Scala代码生成在Java虚拟机(JVM)上运行的.class文件
  • 在Scala中可以轻松使用Java库

16.2.3 Scala操作

16.2.3.1 Scala REPL

Scala REPL(“Read-Evaluate-Print-Loop”)是命令行解释器,您可以将其用作测试Scala代码的“游乐场”区域。我们在这里较早地介绍了它,因此您可以将其与以下代码示例一起使用。 要启动REPL会话,只需在操作系统命令行中键入scala,您将看到以下内容:

 $ scala

 Welcome to Scala 2.13.0 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131).
 Type in expressions for evaluation. Or try :help.

16.2.3.2 注释

Scala中的注释就像Java(以及许多其他语言)中的注释一样:

单行注释:

 // a single line comment

多行注释:

 /*
  * a multiline comment
  */

 /**
  * also a multiline comment
  */

16.2.3.3 创建变量

Scala中有两种类型的变量:

val:创建一个不可变的变量(如Java中的final)。如果变量值不可以被修改,该变量在存储和调用过程中会更加简便,操作过程中也会节约许多时间,因此此种类型变量是首选。 var:创建一个可变变量,该类型变量仅在有特定原因使用时才应使用

创建变量示例如下:

 val x = 1   //immutable
 var y = 0   //mutable

在Scala中,通常在创建变量时不需声明其类型。因为在执行创建变量操作时,Scala通常可以自行推断数据类型。用户可以使用Scala REPL或Spark Shell对变量类型进行检查:

 val x = 1
 val s = "a string"

此功能称为类型推断,它是确保代码简洁的好方法。 为了保证代码一致性,用户还可以显式声明变量的类型,但这通常不是必需的:

 val x: Int = 1
 val s: String = "a string"

16.2.3.4 Scala内嵌数据类型

Scala中有用户期望的标准数据类型。在Scala中,所有这些数据类型都是成熟的对象(不是原始数据类型)。

 val b: Byte = 1
 val x: Int = 1
 val l: Long = 1
 val s: Short = 1
 val d: Double = 2.0

因为Int和Double是默认数字类型,所以通常在创建它们时无需显式声明数据类型:

 val i = 123   // defaults to Int
 val x = 1.0   // defaults to Double

Scala中对于大量数据处理,还具有类型BigInt和BigDecimal。 BigInt和BigDecimal的一大优点是它们支持习惯于使用数字类型的所有运算符。

 var b0 = BigInt(987654321)
 var b1 = BigInt(1234567890)
 var b2 = BigDecimal(123456.789)

 b0 + b1
 b0 += 1
 b1 * b1

16.2.3.5 列表

Scala中的List类是线性的、不变的序列。这意味着它是一个用户无法修改的链表。 每当添加或删除List元素时,都需要从现有List创建一个新List。

 val ints = List(1, 2, 3)
 val names = List("Joel", "Chris", "Ed")

可以添加元素到List前面:

 val b = 0 +: ints

可以将一个List合并到另一个List前:

 val b2 = List(-1, 0) ++: ints

其中,:字符表示被合并或添加的List所在的一侧。 因此,当使用+:时List必须在右边,将该元素添加到List的左边。

 val c1 = ints :+ 4

 val c2 = ints :+ List(4,5)

16.2.3.6 控制结构

16.2.3.6.1 if/else

Scala的if/else控制结构与其他语言类似:

    if (test1) {
     doA()
 } else if (test2) {
     doB()
 } else if (test3) {
     doC()
 } else {
     doD()
 }

Scala中的if/else控制结构不需要像Python中一样要求对齐,但是类似于C语言,要使用花括号括指示结构。

但是,与Java和许多其他语言不同,Scala的if/else控制结构返回一个值。因此,除上述用法外,可以将其用作三元运算符:

 val a = 5
 val b = 7
 val x = if (a < b) a else b
16.2.3.6.2 match表达式

Scala中有match表达式,强大的match表达式是Scala的一大特色,其最基本的用法类似于Java switch语句:

 // val i = 1
 val i = 5

 val result = i match {
     case 1 => "one"
     case 2 => "two"
     case _ => "not 1 or 2"
 }

Scala中的match表达式不仅限于处理整数,还可以用于任何数据类型,包括布尔值:

 val test = true

 val boolean2String = test match {
     case true => "it is true"
     case false => "it is false"
 }

下面是一个将match用作方法主体并针对许多不同类型进行匹配的示例:

 def getClassAsString(x: Any):String = x match {
     case s: String => s + " is a String"
     case i: Int => "Int"
     case f: Float => "Float"
     case l: List[_] => "List"
     case p: Person => "Person"
     case _ => "Unknown"
 }
16.2.3.6.3 try/catch

Scala的try/catch控制结构可捕获异常。其与Java中相应结构类似,但语法与match表达式一致:

 try {
     writeToFile(text)
 } catch {
     case fnfe: FileNotFoundException => println(fnfe)
     case ioe: IOException => println(ioe)
 }
16.2.3.6.4 for循环和表达式

Scala的for循环如下所示:

 for (arg <- args) println(arg)

 // "x to y" syntax
 for (i <- 0 to 5) println(i)

 // "x to y by" syntax
 for (i <- 0 to 10 by 2) println(i)

还可以将yield关键字添加到for循环中,以创建产生结果的for表达式。 这是一个for表达式,它将序列1到5中的每个值加倍:

 val x = for (i <- 1 to 5) yield i * 2

这是另一个用于迭代字符串列表的表达式:通过for循环能自动再生成一个变量

 val fruits = List("apple", "banana", "lime", "orange")

 val fruitLengths = for {
     f <- fruits
     if f.length > 4
 } yield f.length
16.2.3.6.5 while与do/while

Scala还具有whiledo/while循环。它的一般语法如下:

 // while loop
 while(condition) {
     statement(a)
     statement(b)
 }

 // do-while
 do {
    statement(a)
    statement(b)
 }
 while(condition)

16.2.3.7 Scala Classes

这是一个Scala类的示例:

 class Person(var firstName: String, var lastName: String) {
     def printFullName() = println(s"$firstName $lastName")
 }

这是使用该类的方式:

 val p = new Person("Julia", "Kern")
 println(p.firstName)
 p.lastName = "Manes"
 p.printFullName()

要注意的是,无需创建“get”和“set”方法来访问类中的字段。

16.2.3.8 Scala methods

就像其他OOP语言一样,Scala类也具有方法,Scala方法使用如下:

 def sum(a: Int, b: Int): Int = a + b
 def concatenate(s1: String, s2: String): String = s1 + s2

不必声明方法的返回类型,因此,编写这样的两种方法是完全合法的:

 def sum(a: Int, b: Int) = a + b
 def concatenate(s1: String, s2: String) = s1 + s2

以下是调用方法的方式:

 val x = sum(1, 2)
 val y = concatenate("foo", "bar")

可以使用方法做更多的事情,例如为方法参数提供默认值等。

16.3 使用Scala进行Spark操作

16.3.1 矩阵运算

Spark的MLlib支持存储在一台机器上的局部向量和矩阵,以及由一个或多个RDD支持的分布式矩阵。 局部向量和局部矩阵是充当公共接口的简单数据模型。基本的线性代数运算由Scala库Breeze提供。

本地使用Scala中的Breeze模块示例如下:

 import breeze.linalg._

 val x = DenseVector.zeros[Double](5)

16.3.2 使用Scala进行Spark Matrix操作

16.3.2.1 局部向量

局部向量具有存储在单个计算机上的整数类型索引和基于0的索引以及双精度类型的值。 MLlib支持两种类型的局部向量:密集和稀疏。密集向量由表示其输入值的双精度数组支持,而稀疏向量由两个并行数组支持:索引和值。例如,向量(1.0, 0.0, 3.0)可以用密集格式表示为[1.0, 0.0, 3.0],也可以用稀疏格式表示为(3, [0, 2], [1.0, 3.0]),其中3是向量的大小。

局部向量的基类是Vector,Scala中提供两种实现:DenseVector和SparseVector。 建议使用Vector中实现的工厂方法来创建局部矢量。 有关API的详细信息,请参考Vector Scala文档和Vectors Scala文档。

 import org.apache.spark.ml.linalg.{Vector, Vectors}

 // Create a dense vector (1.0, 0.0, 3.0).
 val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
 // Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and    values corresponding to nonzero entries.
 val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
 // Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries.
 val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))

要注意的是,默认情况下,Scala会导入scala.collection.immutable.Vector,因此必须显式导入org.apache.spark.mllib.linalg.Vector才能使用MLlib的Vector。

16.3.2.2 标记点

标记点(Labeled point)是与标记/响应关联的局部矢量,可以是稠密的或稀疏的。在MLlib中,标记点用于监督学习算法中。 使用双精度来存储标签,因此我们可以在回归和分类中使用带标签的点。 对于二进制分类,标签应为0(负数)或1(正数)。 对于多类分类,标签应为从零开始的类索引:0, 1, 2, ….。

Scala中标记点由案例类LabeledPoint实现。 有关API的详细信息,请参考LabeledPoint Scala文档。

 import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.regression.LabeledPoint

 // Create a labeled point with a positive label and a dense feature vector.
 val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))

 // Create a labeled point with a negative label and a sparse feature vector.
 val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

16.3.2.3 Local matrix

本地矩阵(Local matrix)具有整数类型的行和列索引以及双重类型的值,它们存储在单个计算机上。 MLlib支持密集矩阵(其条目值以列优先顺序存储在单个双精度数组中)和稀疏矩阵(其非零条目值)以列稀疏顺序以压缩稀疏列(CSC)格式存储。

Local matrix的基类是Matrix,Scala中提供两种实现:DenseMatrix和SparseMatrix。 我们建议使用在Matrix中实现的工厂方法来创建Local matrix。 MLlib中的Local matrix以列优先顺序存储。 有关API的详细信息,请参考Matrix Scala文档和Matrices Scala文档。

 import org.apache.spark.ml.linalg.{Matrix, Matrices}

 // Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
 val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))

 // Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
 val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))

16.3.2.4 Distributed matrix

分布式矩阵(Distributed matrix)具有长型行和列索引以及双型值,它们以分布式方式存储在一个或多个RDD中。 选择正确的格式来存储大型和分布式矩阵非常重要,因为将分布式矩阵转换为其他格式可能需要全局改组,这非常昂贵。 到目前为止,已经实现了四种类型的分布式矩阵。

16.3.2.4.1 RowMatrix

可以从RDD[Vector]实例创建RowMatrix。然后可以计算其列摘要统计信息和分解。 QR分解的形式为\(A = QR\),其中Q为正交矩阵,R为上三角矩阵。 有关奇异值分解(SVD)和主成分分析(PCA),请参阅Dimensionality reduction。 有关API的详细信息,请参考RowMatrix Scala文档。

 import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.mllib.linalg.distributed.RowMatrix

 val rows: RDD[Vector] = ... // an RDD of local vectors
 // Create a RowMatrix from an RDD[Vector].
 val mat: RowMatrix = new RowMatrix(rows)

 // Get its size.
 val m = mat.numRows()
 val n = mat.numCols()

 // QR decomposition
 val qrResult = mat.tallSkinnyQR(true)
16.3.2.4.2 IndexedRowMatrix

可以从RDD[IndexedRow]实例创建IndexedRowMatrix,其中IndexedRow是(Long, Vector)的包装。可以通过删除行索引将IndexedRowMatrix转换为RowMatrix。 有关API的详细信息,请参考IndexedRowMatrix Scala文档。

 import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}

 val rows: RDD[IndexedRow] = ... // an RDD of indexed rows
 // Create an IndexedRowMatrix from an RDD[IndexedRow].
 val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)

 // Get its size.
 val m = mat.numRows()
 val n = mat.numCols()

 // Drop its row indices.
 val rowMat: RowMatrix = mat.toRowMatrix()
16.3.2.4.3 CoordinateMatrix

CoordinateMatrix是由其条目的RDD支持的分布式矩阵。每个条目都是(i: Long, j: Long, value: Double)的元组,其中i是行索引,j是列索引,value是条目值。 仅当矩阵的两个维度都很大且矩阵非常稀疏时,才应使用CoordinateMatrix。 可以从RDD[MatrixEntry]实例创建CoordinateMatrix,其中MatrixEntry是(Long, Long, Double)的包装。 通过调用toIndexedRowMatrix,可以将CoordinateMatrix转换为具有稀疏行的IndexedRowMatrix。目前不支持CoordinateMatrix的其他计算。 有关API的详细信息,请参阅CoordinateMatrix Scala文档。

 import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

 val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries
 // Create a CoordinateMatrix from an RDD[MatrixEntry].
 val mat: CoordinateMatrix = new CoordinateMatrix(entries)

 // Get its size.
 val m = mat.numRows()
 val n = mat.numCols()

 // Convert it to an IndexRowMatrix whose rows are sparse vectors.
 val indexedRowMatrix = mat.toIndexedRowMatrix()
16.3.2.4.4 BlockMatrix

块矩阵(BlockMatrix)是由MatrixBlocks的RDD支持的分布式矩阵,其中MatrixBlock是 ((Int, Int), Matrix)的元组,其中(Int, Int)是块的索引,而Matrix是给定索引处的矩阵,其大小为rowsPerBlock x colsPerBlock。 BlockMatrix支持诸如与另一个BlockMatrix相加和相乘的方法。 BlockMatrix还具有一个验证器帮助功能,该功能可用于检查BlockMatrix是否正确设置。

通过调用toBlockMatrix,可以容易地从IndexedRowMatrix或CoordinateMatrix创建BlockMatrix。 toBlockMatrix默认创建大小为1024 x 1024的块。 用户可以通过toBlockMatrix(rowsPerBlock, colsPerBlock)提供值来更改块大小。 有关API的详细信息,请参考BlockMatrix Scala文档。

 import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}

 val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries
 // Create a CoordinateMatrix from an RDD[MatrixEntry].
 val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
 // Transform the CoordinateMatrix to a BlockMatrix
 val matA: BlockMatrix = coordMat.toBlockMatrix().cache()

 // Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
 // Nothing happens if it is valid.
 matA.validate()

 // Calculate A^T A.
 val ata = matA.transpose.multiply(matA)