第 16 章 基于Scala语言的Spark
16.1 Scala与Python对比
Spark用Scala编写,因为它是静态类型的,并且可以以已知的方式编译到JVM,因此它可以非常快地进行编写。 尽管Spark拥有适用于Scala,Python,Java和R的API,但流行的语言是前两种。因为Java不支持Read-Evaluate-Print-Loop(REPL),而R不是通用语言。 目前数据科学界分为两个阵营,一个偏爱Scala,另一个偏爱Python。 Scala与Python都有其优点和缺点,最终的选择应取决于用户应用的需求。 若用户想对spark进行深入建模,开发新的模型,就需要知道它的底层构造,这时学习Scala是非常必要的;而若用户只是想使用spark里既有接口中的模型方法,不想开发新的模型,就可以直接使用python。
下面将从性能、学习曲线、适用性、可用性、代码恢复与安全方面探讨两者的异同。
16.1.1 性能
从性能上说,Scala的运行速度通常比Python快10倍以上。 编译语言比解释语言运行速度更快。Scala是静态类型的,在运行时使用Java虚拟机(JVM),在大多数情况下,它比Python快一些。Python是动态类型的,这会降低速度。 Python使用时需调用Spark库,这需要大量代码处理,因此性能也会降低。 Python通过spark处理海量数据时需要很多的内存以实现cache功能。Scala是静态的,它对变量进行提前声明,在计算过程中节省大量计算空间。在这种情况下,Scala对于有限的内核非常有效。 此外,Scala是基于JVM的Hadoop原生。Hadoop非常重要,因为Spark是在Hadoop的文件系统HDFS之上构建的。Python与Hadoop服务的交互非常差,因此开发人员必须使用第三方库(例如hadoopy)。Scala通过Java中的本机Hadoop API与Hadoop交互。这就是为什么在Scala中编写本机Hadoop应用程序非常容易的原因。
16.1.2 学习曲线
Scala与Python都是功能性和面向对象的语言,除了支持社区活跃外,它们还具有相似的语法。 Python具有简单的语法和良好的标准库。由于Scala中的高级功能,与Python相比,Scala学习起来可能更加复杂。 Python对于简单直观的逻辑更可取,而Scala对于复杂的工作流程则更有用。
16.1.3 适用性
Scala具有多个标准库和核,可以在大数据生态系统中快速集成数据库。 Scala允许编写具有多个并发原语的代码,而Python不支持并发或多线程。由于具有并发功能,Scala可以实现更好的内存管理和数据处理。 但是,Python支持重量级的流程派生。在此,一次仅一个线程处于活动状态。因此,无论何时部署新代码,都必须重新启动进程,这会增加内存开销。
16.1.4 可用性
两者都是富有表现力的,我们可以通过它们达到较高的功能水平。 在框架,库,隐式,宏等方面,Scala更为强大。由于其功能性质,Scala在MapReduce框架中运行良好。许多Scala数据框架遵循与Scala的API集合类似的抽象数据类型。开发人员只需要学习基本的标准集合,就可以轻松地熟悉其他库。Spark是用Scala编写的,因此了解Scala可以让您了解和修改Spark在内部所做的工作。此外,许多即将发布的功能往往首先发布在Scala和Java中的API,而Python API在以后的版本中会不断发展。 Python更加用户友好和简洁。对于NLP,Python是首选,因为Scala没有很多用于机器学习或NLP的工具。此外,要使用GraphX,GraphFrames和MLLib,Python也是首选。Python的可视化库补充了Pyspark,这是Spark和Scala不可比拟的。
16.1.5 代码恢复与安全
Scala是一种静态类型的语言,它使我们能够发现编译时的错误。而Python是一种动态类型化的语言。每当用户对现有代码进行更改时,Python语言极容易出现错误。因此,为Scala重构代码比为Python重构更容易。
16.1.6 结论
Python速度较慢,但非常易于使用,而Scala速度快,且也较为易于使用。 Scala提供对Spark最新功能的访问,因为Apache Spark是用Scala编写的。 Apache Spark编程语言的选择取决于最适合项目需求的功能,因为每个语言各有优缺点。 Python更加面向分析,而Scala更加面向工程,但是两者都是用于构建数据科学应用程序的出色语言。总体而言,为了充分利用Spark的潜力,Scala将更加有益。如果用户只是想通过Spark进行开箱即用的机器学习,那么Python是值得学习的。
16.2 Scala介绍
16.2.1 Scala的背景
Scala是Scalable Language的缩写,是一种混合功能编程语言。它是由瑞士洛桑联邦理工学院(EPFL)编程方法教授马丁·奥德斯基(Martin Odersky)创建的。
马丁·奥德斯基(Martin Odersky)(生于1958年9月5日)是德国计算机科学家,也是瑞士洛桑联邦理工学院(EPFL)的编程方法教授。他专门研究代码分析和编程语言。他与其他人一起设计了Scala编程语言和通用Java(以及之前的Pizza)。他是Generic Java的共同设计者之一,实现了GJ编译器,并且他的实现成为Java编译器javac的基础,因此他也被称为javac编译器的“父亲”。 Martin Odersky曾在尼克劳斯·沃思(Niklaus Wirth)的指导下学习,Nicklaus Wirth是最著名的几种编程语言的设计师,创建了Pascal和其他几种语言。
Scala是面向对象的,并且是一种功能语言,因为每个函数都是一个值,每个值都是一个对象,因此最终每个函数都是一个对象。 Scala在Java平台(Java虚拟机)上运行,并且与现有Java程序兼容。打包时,由于Android应用程序通常是用Java编写并从Java字节码转换为Dalvik字节码(在安装过程中可能会进一步转换为本机代码),因此Scala的Java兼容性使其非常适合Android开发,而在使用功能性方法时更是如此是首选。
Scala的设计始于2001年,在洛桑联邦理工学院(EPFL),2003年内部发布后,Scala于2004年初在Java平台上公开发布。 Scala不是创建“更好的Java”的唯一尝试。诸如Kotlin和Ceylon之类的替代方案也走了这条路,但他们做出了一个基本决定,即在语法上保持与Java语言本身非常接近,以最大程度地减少学习时间。这似乎是个好主意,但最终还是有些自欺欺人,因为它迫使用户停留在许多完全相同的Java范式之内,而Java范式正是首先要创建“更好的Java”的原因。 相比之下,Scala的创建目的是成为一种更好的语言,以摆脱Java在过于繁琐或令人沮丧的方面对开发人员的限制。这样开发的结果是Scala的学习曲线陡峭,使一些公司不去使用它。但是,Twitter,Apple,Linkedin,Airbnb或UBS等公司都在使用它。
16.2.2 Scala概述
一些有关Scala的重要知识:
- Scala是一种高级语言
- Scala是静态输入
- Scala语言具有表达性,其语法简洁但仍然可读
- Scala支持面向对象编程(OOP)范例-每个变量都是一个对象,每个“运算符”都是一个方法。
- Scala支持函数编程(FP)范例-函数也是变量,用户可以将它们传递给其他函数。用户可以使用OOP,FP编写代码,也可以将它们组合成混合样式。
- Scala具有完善的类型推断系统
- Scala代码生成在Java虚拟机(JVM)上运行的.class文件
- 在Scala中可以轻松使用Java库
16.2.3 Scala操作
16.2.3.1 Scala REPL
Scala REPL(“Read-Evaluate-Print-Loop”)是命令行解释器,您可以将其用作测试Scala代码的“游乐场”区域。我们在这里较早地介绍了它,因此您可以将其与以下代码示例一起使用。 要启动REPL会话,只需在操作系统命令行中键入scala,您将看到以下内容:
$ scala
Welcome to Scala 2.13.0 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131).
Type in expressions for evaluation. Or try :help.
16.2.3.2 注释
Scala中的注释就像Java(以及许多其他语言)中的注释一样:
单行注释:
// a single line comment
多行注释:
/*
* a multiline comment
*/
/**
* also a multiline comment
*/
16.2.3.3 创建变量
Scala中有两种类型的变量:
val
:创建一个不可变的变量(如Java中的final)。如果变量值不可以被修改,该变量在存储和调用过程中会更加简便,操作过程中也会节约许多时间,因此此种类型变量是首选。
var
:创建一个可变变量,该类型变量仅在有特定原因使用时才应使用
创建变量示例如下:
val x = 1 //immutable
var y = 0 //mutable
在Scala中,通常在创建变量时不需声明其类型。因为在执行创建变量操作时,Scala通常可以自行推断数据类型。用户可以使用Scala REPL或Spark Shell对变量类型进行检查:
val x = 1
val s = "a string"
此功能称为类型推断,它是确保代码简洁的好方法。 为了保证代码一致性,用户还可以显式声明变量的类型,但这通常不是必需的:
val x: Int = 1
val s: String = "a string"
16.2.3.4 Scala内嵌数据类型
Scala中有用户期望的标准数据类型。在Scala中,所有这些数据类型都是成熟的对象(不是原始数据类型)。
val b: Byte = 1
val x: Int = 1
val l: Long = 1
val s: Short = 1
val d: Double = 2.0
因为Int和Double是默认数字类型,所以通常在创建它们时无需显式声明数据类型:
val i = 123 // defaults to Int
val x = 1.0 // defaults to Double
Scala中对于大量数据处理,还具有类型BigInt和BigDecimal。 BigInt和BigDecimal的一大优点是它们支持习惯于使用数字类型的所有运算符。
var b0 = BigInt(987654321)
var b1 = BigInt(1234567890)
var b2 = BigDecimal(123456.789)
b0 + b1
b0 += 1
b1 * b1
16.2.3.5 列表
Scala中的List类是线性的、不变的序列。这意味着它是一个用户无法修改的链表。 每当添加或删除List元素时,都需要从现有List创建一个新List。
val ints = List(1, 2, 3)
val names = List("Joel", "Chris", "Ed")
可以添加元素到List前面:
val b = 0 +: ints
可以将一个List合并到另一个List前:
val b2 = List(-1, 0) ++: ints
其中,:
字符表示被合并或添加的List所在的一侧。
因此,当使用+:
时List必须在右边,将该元素添加到List的左边。
val c1 = ints :+ 4
val c2 = ints :+ List(4,5)
16.2.3.6 控制结构
16.2.3.6.1 if/else
Scala的if/else
控制结构与其他语言类似:
if (test1) {
doA()
} else if (test2) {
doB()
} else if (test3) {
doC()
} else {
doD()
}
Scala中的if/else
控制结构不需要像Python中一样要求对齐,但是类似于C语言,要使用花括号括指示结构。
但是,与Java和许多其他语言不同,Scala的if/else
控制结构返回一个值。因此,除上述用法外,可以将其用作三元运算符:
val a = 5
val b = 7
val x = if (a < b) a else b
16.2.3.6.2 match表达式
Scala中有match
表达式,强大的match
表达式是Scala的一大特色,其最基本的用法类似于Java switch语句:
// val i = 1
val i = 5
val result = i match {
case 1 => "one"
case 2 => "two"
case _ => "not 1 or 2"
}
Scala中的match
表达式不仅限于处理整数,还可以用于任何数据类型,包括布尔值:
val test = true
val boolean2String = test match {
case true => "it is true"
case false => "it is false"
}
下面是一个将match
用作方法主体并针对许多不同类型进行匹配的示例:
def getClassAsString(x: Any):String = x match {
case s: String => s + " is a String"
case i: Int => "Int"
case f: Float => "Float"
case l: List[_] => "List"
case p: Person => "Person"
case _ => "Unknown"
}
16.2.3.6.3 try/catch
Scala的try/catch
控制结构可捕获异常。其与Java中相应结构类似,但语法与match
表达式一致:
try {
writeToFile(text)
} catch {
case fnfe: FileNotFoundException => println(fnfe)
case ioe: IOException => println(ioe)
}
16.2.3.6.4 for循环和表达式
Scala的for
循环如下所示:
for (arg <- args) println(arg)
// "x to y" syntax
for (i <- 0 to 5) println(i)
// "x to y by" syntax
for (i <- 0 to 10 by 2) println(i)
还可以将yield
关键字添加到for
循环中,以创建产生结果的for
表达式。 这是一个for
表达式,它将序列1到5中的每个值加倍:
val x = for (i <- 1 to 5) yield i * 2
这是另一个用于迭代字符串列表的表达式:通过for
循环能自动再生成一个变量
val fruits = List("apple", "banana", "lime", "orange")
val fruitLengths = for {
f <- fruits
if f.length > 4
} yield f.length
16.2.3.6.5 while与do/while
Scala还具有while
和do/while
循环。它的一般语法如下:
// while loop
while(condition) {
statement(a)
statement(b)
}
// do-while
do {
statement(a)
statement(b)
}
while(condition)
16.2.3.7 Scala Classes
这是一个Scala类的示例:
class Person(var firstName: String, var lastName: String) {
def printFullName() = println(s"$firstName $lastName")
}
这是使用该类的方式:
val p = new Person("Julia", "Kern")
println(p.firstName)
p.lastName = "Manes"
p.printFullName()
要注意的是,无需创建“get”和“set”方法来访问类中的字段。
16.2.3.8 Scala methods
就像其他OOP语言一样,Scala类也具有方法,Scala方法使用如下:
def sum(a: Int, b: Int): Int = a + b
def concatenate(s1: String, s2: String): String = s1 + s2
不必声明方法的返回类型,因此,编写这样的两种方法是完全合法的:
def sum(a: Int, b: Int) = a + b
def concatenate(s1: String, s2: String) = s1 + s2
以下是调用方法的方式:
val x = sum(1, 2)
val y = concatenate("foo", "bar")
可以使用方法做更多的事情,例如为方法参数提供默认值等。
16.3 使用Scala进行Spark操作
16.3.1 矩阵运算
Spark的MLlib
支持存储在一台机器上的局部向量和矩阵,以及由一个或多个RDD支持的分布式矩阵。
局部向量和局部矩阵是充当公共接口的简单数据模型。基本的线性代数运算由Scala库Breeze
提供。
本地使用Scala中的Breeze
模块示例如下:
import breeze.linalg._
val x = DenseVector.zeros[Double](5)
16.3.2 使用Scala进行Spark Matrix操作
16.3.2.1 局部向量
局部向量具有存储在单个计算机上的整数类型索引和基于0的索引以及双精度类型的值。
MLlib
支持两种类型的局部向量:密集和稀疏。密集向量由表示其输入值的双精度数组支持,而稀疏向量由两个并行数组支持:索引和值。例如,向量(1.0, 0.0, 3.0)可以用密集格式表示为[1.0, 0.0, 3.0],也可以用稀疏格式表示为(3, [0, 2], [1.0, 3.0]),其中3是向量的大小。
局部向量的基类是Vector,Scala中提供两种实现:DenseVector和SparseVector。 建议使用Vector中实现的工厂方法来创建局部矢量。 有关API的详细信息,请参考Vector Scala文档和Vectors Scala文档。
import org.apache.spark.ml.linalg.{Vector, Vectors}
// Create a dense vector (1.0, 0.0, 3.0).
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.
val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries.
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
要注意的是,默认情况下,Scala会导入scala.collection.immutable.Vector
,因此必须显式导入org.apache.spark.mllib.linalg.Vector
才能使用MLlib
的Vector。
16.3.2.2 标记点
标记点(Labeled point)是与标记/响应关联的局部矢量,可以是稠密的或稀疏的。在MLlib
中,标记点用于监督学习算法中。
使用双精度来存储标签,因此我们可以在回归和分类中使用带标签的点。 对于二进制分类,标签应为0(负数)或1(正数)。 对于多类分类,标签应为从零开始的类索引:0, 1, 2, ….。
Scala中标记点由案例类LabeledPoint
实现。
有关API的详细信息,请参考LabeledPoint Scala文档。
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
// Create a labeled point with a positive label and a dense feature vector.
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
// Create a labeled point with a negative label and a sparse feature vector.
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
16.3.2.3 Local matrix
本地矩阵(Local matrix)具有整数类型的行和列索引以及双重类型的值,它们存储在单个计算机上。 MLlib
支持密集矩阵(其条目值以列优先顺序存储在单个双精度数组中)和稀疏矩阵(其非零条目值)以列稀疏顺序以压缩稀疏列(CSC)格式存储。
Local matrix的基类是Matrix,Scala中提供两种实现:DenseMatrix和SparseMatrix。 我们建议使用在Matrix中实现的工厂方法来创建Local matrix。 MLlib中的Local matrix以列优先顺序存储。 有关API的详细信息,请参考Matrix Scala文档和Matrices Scala文档。
import org.apache.spark.ml.linalg.{Matrix, Matrices}
// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))
16.3.2.4 Distributed matrix
分布式矩阵(Distributed matrix)具有长型行和列索引以及双型值,它们以分布式方式存储在一个或多个RDD中。 选择正确的格式来存储大型和分布式矩阵非常重要,因为将分布式矩阵转换为其他格式可能需要全局改组,这非常昂贵。 到目前为止,已经实现了四种类型的分布式矩阵。
16.3.2.4.1 RowMatrix
可以从RDD[Vector]实例创建RowMatrix。然后可以计算其列摘要统计信息和分解。 QR分解的形式为\(A = QR\),其中Q为正交矩阵,R为上三角矩阵。 有关奇异值分解(SVD)和主成分分析(PCA),请参阅Dimensionality reduction。 有关API的详细信息,请参考RowMatrix Scala文档。
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val rows: RDD[Vector] = ... // an RDD of local vectors
// Create a RowMatrix from an RDD[Vector].
val mat: RowMatrix = new RowMatrix(rows)
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
// QR decomposition
val qrResult = mat.tallSkinnyQR(true)
16.3.2.4.2 IndexedRowMatrix
可以从RDD[IndexedRow]实例创建IndexedRowMatrix,其中IndexedRow是(Long, Vector)的包装。可以通过删除行索引将IndexedRowMatrix转换为RowMatrix。 有关API的详细信息,请参考IndexedRowMatrix Scala文档。
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
val rows: RDD[IndexedRow] = ... // an RDD of indexed rows
// Create an IndexedRowMatrix from an RDD[IndexedRow].
val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
// Drop its row indices.
val rowMat: RowMatrix = mat.toRowMatrix()
16.3.2.4.3 CoordinateMatrix
CoordinateMatrix是由其条目的RDD支持的分布式矩阵。每个条目都是(i: Long, j: Long, value: Double)的元组,其中i是行索引,j是列索引,value是条目值。
仅当矩阵的两个维度都很大且矩阵非常稀疏时,才应使用CoordinateMatrix。
可以从RDD[MatrixEntry]实例创建CoordinateMatrix,其中MatrixEntry是(Long, Long, Double)的包装。
通过调用toIndexedRowMatrix
,可以将CoordinateMatrix转换为具有稀疏行的IndexedRowMatrix。目前不支持CoordinateMatrix的其他计算。
有关API的详细信息,请参阅CoordinateMatrix Scala文档。
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val mat: CoordinateMatrix = new CoordinateMatrix(entries)
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
// Convert it to an IndexRowMatrix whose rows are sparse vectors.
val indexedRowMatrix = mat.toIndexedRowMatrix()
16.3.2.4.4 BlockMatrix
块矩阵(BlockMatrix)是由MatrixBlocks的RDD支持的分布式矩阵,其中MatrixBlock是 ((Int, Int), Matrix)的元组,其中(Int, Int)是块的索引,而Matrix是给定索引处的矩阵,其大小为rowsPerBlock x colsPerBlock。 BlockMatrix支持诸如与另一个BlockMatrix相加和相乘的方法。 BlockMatrix还具有一个验证器帮助功能,该功能可用于检查BlockMatrix是否正确设置。
通过调用toBlockMatrix
,可以容易地从IndexedRowMatrix或CoordinateMatrix创建BlockMatrix。 toBlockMatrix
默认创建大小为1024 x 1024的块。 用户可以通过toBlockMatrix(rowsPerBlock, colsPerBlock)
提供值来更改块大小。
有关API的详细信息,请参考BlockMatrix Scala文档。
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}
val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
// Transform the CoordinateMatrix to a BlockMatrix
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()
// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Nothing happens if it is valid.
matA.validate()
// Calculate A^T A.
val ata = matA.transpose.multiply(matA)