第 15 章 使用Spark进行流数据建模

15.1 Spark Streaming概述

Spark Streaming是核心Spark API的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理。 Spark Streaming可以从许多来源(例如Kafka,Flume,Kinesis或TCP套接字)中提取数据,并可以使用以高级功能(如map,reduce,join和window)表达的复杂算法来处理数据。 用户可以在数据流上应用Spark的机器学习和图形处理算法。 可以将处理后的数据推送到文件系统,数据库和实时仪表板。

15.1.1 流数据建模的工作流程

在内部,它的工作方式如下:

  • Spark Streaming接收实时输入数据流
  • Spark Streaming将数据分为几批
  • 由Spark引擎处理流数据,以分批生成最终结果流

15.1.2 DStream-Spark的数据流表示

Spark Streaming提供了称为discretized stream(离散流)或DStream的高级抽象,它表示连续的数据流。 DStreams可以根据来自诸如Kafka,Flume和Kinesis之类的源的输入数据流来创建,也可以通过对其他DStreams应用高级操作来创建。 在内部,DStream表示为一系列RDD。

15.1.3 实例

在介绍编写Spark Streaming程序的细节之前,让我们快速了解一下简单的Spark Streaming程序。 假设我们要计算从TCP接口接收到的文本数据中的单词数。所需操作如下:

首先,导入StreamingContext,这是所有流功能的主要入口点。 创建具有两个执行线程和1秒批处理间隔的本地StreamingContext。

 from pyspark import SparkContext
 from pyspark.streaming import StreamingContext

 # Create a local StreamingContext with two working thread and batch interval of 1 second
 sc = SparkContext("local[2]", "NetworkWordCount")
 ssc = StreamingContext(sc, 1)

使用这个Context,我们可以创建一个DStream,表示来自TCP源的流数据,指定主机名(例如localhost)和端口(例如9999)。

 # Create a DStream that will connect to hostname:port, like localhost:9999
 lines = ssc.socketTextStream("localhost", 9999)

lines DStream表示将从数据服务器接收的数据流。此DStream中的每个记录都是一行文本。 接下来,我们要按行将lines分成单词。

 # Split each line into words
 words = lines.flatMap(lambda line: line.split(" "))

flatMap是一对多的DStream操作,它通过从源DStream中的每个记录生成多个新记录来创建新的DStream。 在这种情况下,每行将分为多个单词,单词流表示为words DStream。 接下来,我们要对这些单词进行计数。

 # Count each word in each batch
 pairs = words.map(lambda word: (word, 1))
 wordCounts = pairs.reduceByKey(lambda x, y: x + y)

 # Print the first ten elements of each RDD generated in this DStream to the console
 wordCounts.pprint()

将words DStream进一步map(一对一转换)到 (word, 1)对的DStream中,然后将其reduce以获取每批数据中单词的频率。 最后,wordCounts.pprint()将打印每秒生成的一些计数。

要注意的是,执行这些行时,Spark Streaming仅设置启动时将执行的计算,并且尚未开始任何实际处理。 在完成所有转换后,要开始处理,我们最终调用

 ssc.start()             # Start the computation
 ssc.awaitTermination()  # Wait for the computation to terminate

类似的,读取HDFS文件并对其进行字数统计的操作如下:

 import sys

 from pyspark import SparkContext
 from pyspark.streaming import StreamingContext


 sc = SparkContext(appName="PythonStreamingHDFSWordCount")
 ssc = StreamingContext(sc, 10)

 lines = ssc.textFileStream(sys.argv[1])
 counts = lines.flatMap(lambda line: line.split(" "))\
               .map(lambda x: (x, 1))\
               .reduceByKey(lambda a, b: a+b)
 counts.pprint()

 ssc.start()
 ssc.awaitTermination()

15.1.4 创建sparkstreaming操作总结

1.通过创建输入DStream定义输入源。 2.通过将转换和输出操作应用于DStream来定义流计算。 3.开始使用streamingContext.start()接收数据并对其进行处理。 4.等待使用streamingContext.awaitTermination()停止处理(手动或由于任何错误),也可以使用streamingContext.stop()手动停止处理。

在具体操作时要注意一下几点:

  • 一旦streamingContext启动,就无法设置新的流计算或添加计算到该流计算中。
  • streamingContext一旦停止,就无法重新启动。想要重新启动streamingContext只能重新设置并启动。
  • JVM中只能同时激活一个StreamingContext。
  • StreamingContext上的stop()操作也会停止SparkContext。要仅停止StreamingContext,应将streamingContext.stop()的可选参数设置为false。
  • 只要在创建下一个StreamingContext之前停止了上一个StreamingContext(不停止SparkContext),就可以将SparkContext重用于创建多个StreamingContext。

15.2 Spark Streaming 的详细应用信息

15.2.1 DStream模型

类似于RDD之于Spark,Spark Streaming也有自己的核心数据对象,称为DStream(Discretized Stream,离散流)。 使用Spark Streaming需要基于一些数据源创建DStream,然后在DStream上进行一些数据操作,这里的DStream可以近似地看作是一 个比RDD更高一级的数据结构或者是RDD的序列。

虽然Spark Streaming声称是进行流数据处理的大数据系统,但从DStream的名称中就可以看出,它本质上仍然是基于批处理的。 DStream将数据流进行分片,转化为多个batch,然后使用Spark Engine对这些batch进行处理和分析。

这里的batch是基于时间间隔来进行分割的,这里的批处理时间间隔(batch interval)需要人为确定,每一个batch的数据对应一个RDD实例。

15.2.1.1 Input DStream

Input DStream是一种特殊的DStream,它从外部数据源中获取原始数据流,连接到Spark Streaming中。Input DStream可以接受两种 类型的数据输入: (1)基本输入源:文件流、套接字连接和Akka Actor。 (2)高级输入源:Kafka、Flume、Kineis、Twitter等。

基本输入源可以直接应用于StreamingContext API,而对于高级输入源则需要一些额外的依赖包。

由于Input DStream要接受外部的数据输入,因此每个Input DStream(不包括文件流)都会对应一个单一的接收器(Receiver)对象,每 个接收器对象都对应接受一个数据流输入。由于接收器需要持续运行,因此会占用分配给Spark Streaming的一个核,如果可用的核数不大于 接收器的数量,会导致无法对数据进行其他变换操作。

15.2.1.2 DStream变换

DStream的转换也和RDD的转换类似,即对于数据对象进行变换、计算等操作,但DStream的Transformation中还有一些特殊的操作, 如updateStateByKey()、transform()以及各种Window相关的操作。下面列举了一些主要的DStream操作:

Transformation 说明
map(func) 对DStream中的各个元素进行func函数操作,然后返回一个新的DStream.
flatMap(func) 与map方法类似,只不过各个输入项可以被输出为零个或多个输出项.
filter(func) 过滤出所有函数func返回值为true的DStream元素并返回一个新的DStream.
repartition(numPartitions) 增加或减少DStream中的分区数,从而改变DStream的并行度.
union(otherStream) 将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream.
count() 通过对DStream中的各个RDD中的元素进行计数.
reduce(func) 对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream.
countByValue() 对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数.
reduceByKey(func, [numTasks]) 利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream.
join(otherStream, [numTasks]) 输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W))类型的DStream.
cogroup(otherStream, [numTasks]) 输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStream.
transform(func) 通过RDD-to-RDD函数作用于DStream中的各个RDD,返回一个新的RDD.
updateStateByKey(func) 根据于key的前置状态和key的新值,对key进行更新,返回一个新状态的DStream.

15.2.1.3 DStream输出操作

DStream的输出操作(Output Operations)可以将DStream的数据输出到外部的数据库或文件系统。与RDD的Action类似,当某 个Output Operation被调用时,Spark Streaming程序才会开始真正的计算过程。

下面列举了一些具体的输出操作:

Output Operations Interpretation
print() 打印到控制台.
saveAsTextFiles(prefix, [suffix]) 保存DStream的内容为文本文件,文件名为"prefix-TIME
saveAsObjectFiles(prefix, [suffix]) 保存DStream的内容为SequenceFile,文件名为 "prefix-TIME
saveAsHadoopFiles(prefix, [suffix]) 保存DStream的内容为Hadoop文件,文件名为"prefix-TIME
foreachRDD(func) 对Dstream里面的每个RDD执行func,并将结果保存到外部系统,如保存到RDD文件中或写入数据库.

15.2.2 窗口操作

SparkStreaming提供了基于窗口(Window)的计算,即可以通过一个滑动窗口,对原始DStream的数据进行转换,得到一个新的DStream。 这里涉及到两个参数的设定: (1)窗口长度(windowlength):一个窗口覆盖的流数据的时间长度,必须是批处理时间间隔的倍数。窗口长度决定了一个窗口内包含多少个batch的数据。 (2)窗口滑动时间间隔(slideinterval):前一个窗口到后一个窗口所经过的时间长度,必须是批处理时间间隔的倍数。

用一个例子来说明窗口操作。假设要扩展前面的示例,方法是每10秒在数据的最后30秒生成一次字数统计。为此,我们必须在数据的最后30秒内对(word,1)对的DStream对应用reduceByKey操作。这是通过操作reduceByKeyAndWindow完成的。

 # Reduce last 30 seconds of data, every 10 seconds
 windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)

一些常见的窗口操作如下,所有这些操作均采用上述两个参数windowLength和slideInterval。

Transformation Meaning
window(windowLength, slideInterval) 返回基于源DStream的窗口批处理计算的新DStream。
countByWindow(windowLength, slideInterval) 返回stream中元素的滑动窗口计数。
reduceByWindow(func, windowLength, slideInterval) 返回一个新的单元素流,该流是通过使用func在滑动间隔内聚合流中的元素而创建的。该函数应该是关联的和可交换的,以便可以并行正确地计算它。
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 在(K,V)对的DStream上调用时,返回新的(K,V)对的DStream,其中使用给定的reduce函数func在滑动窗口中的批处理上汇总每个键的值。注意:默认情况下,使用Spark的默认并行任务数(本地模式为2,而在集群模式下,此数量由config属性spark.default.parallelism确定)进行分组。 可以传递一个可选的numTasks参数来设置不同数量的任务。
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 上述reduceByKeyAndWindow()的更有效的版本,其中,使用前一个窗口的缩减值递增地计算每个窗口的缩减值。这是通过减少进入滑动窗口的新数据并“逆向减少”离开窗口的旧数据来完成的。一个示例是在窗口滑动时“增加”和“减少”键的计数。但是,它仅适用于“可逆归约函数”,即具有相应“逆归约”功能(作为参数invFunc的归约)的归约函数。像reduceByKeyAndWindow中一样,reduce任务的数量可以通过可选参数配置。请注意,必须启用检查点才能使用此操作。
countByValueAndWindow(windowLength, slideInterval, [numTasks]) 在(K,V)对的DStream上调用时,返回新的(K,Long)对的DStream,其中每个键的值是其在滑动窗口内的频率。像reduceByKeyAndWindow中一样,reduce任务的数量可以通过可选参数配置。

15.2.3 外部数据来源

从Spark 2.4.5开始,Python API中提供了Kafka,Kinesis和Flume。 此类来源需要与外部非Spark库进行交互,其中一些库具有复杂的依赖关系(例如Kafka和Flume)。因此,为了最大程度地减少与依赖项版本冲突有关的问题,已从这些源创建DStream的功能已移至单独的库,可以在必要时显式链接到这些库。 请注意,Spark Shell中没有这些高级源,因此无法在Shell中测试基于这些高级源的应用程序。如果您真的想在Spark shell中使用它们,则必须下载相应的Maven工件的JAR及其依赖项,并将其添加到类路径中。 这些高级资源如下。 - Kafka:Spark Streaming 2.4.5与Kafka代理0.8.2.1或更高版本兼容。有关更多详细信息,请参见《 Kafka集成指南》。 - Flume:Spark Streaming 2.4.5与Flume 1.6.0兼容。有关更多详细信息,请参见《 Flume集成指南》。 - Kinesis:Spark Streaming 2.4.5与Kinesis Client Library 1.2.1兼容。有关更多详细信息,请参见《 Kinesis集成指南》。

15.2.4 容错机制

容错(Fault Tolerance)指的是一个系统在部分模块出现故障时仍旧能够提供服务的能力。一个分布式数据处理程序要能够长期不间断运 行,这就要求计算模型具有很高的容错性。

Spark操作的数据一般存储与类似HDFS这样的文件系统上,这些文件系统本身就有容错能力。但是由于Spark Streaming处理的很多数据 是通过网络接收的,即接收到数据的时候没有备份,为了让Spark Streaming程序中的RDD都能够具有和普通RDD一样的容错性,这些数据 需要被复制到多个Worker节点的Executor内存中。

Spark Streaming通过检查点(Check Point)的方式来平衡容错能力和代价问题。DStream依赖的RDD是可重复的数据集,每一个RDD从 建立之初都记录了每一步的计算过程,如果RDD某个分区由于一些原因数据丢失了,就可以重新执行计算来恢复数据。随着运行时间的增加, 数据恢复的代价也会随着计算过程而增加,因此Spark提供了检查点功能,定期记录计算的中间状态,避免数据恢复时的漫长计算过程。 Spark Streaming支持两种类型的检查点:

(1)元数据检查点。这种检查点可以记录Driver的配置、操作以及未完成的批次,可以让Driver节点在失效重启后可以继续运行。

(2)数据检查点。这种检查点主要用来恢复有状态的转换操作,例如updateStateByKey或者reduceByKeyAndWindow操作,它可以 记录数据计算的中间状态,避免在数据恢复时,长依赖链带来的恢复时间的无限增长。

开启检查点功能需要设置一个可靠的文件系统路径来保存检查点信息,代码如下:

streamingContext.checkpoing(checkPointDirectory)    //参数是文件路径

为了让Driver自动重启时能够开启检查点功能,需要对原始StreamingContext进行简单的修改,创建一个检查检查点目录的函数,代码如下:

def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)
    ssc.checkpoint(checkpointDirecroty)    //设置检查点目录
    ...
    val lines = ssc.socketTextStream(...)  //创建DStream
}

//从检查点目录恢复一个StreamingContext或者创建一个新的
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
                                      functionToCreateContext())
//启动context
context.start()
context.awaitTermination()