第 9 章 Spark 分布式计算平台
9.1 Spark简介
9.1.1 Spark的历史和发展
Spark是一种分布式计算框架,它使用了内存运算技术,具有支持迭代计算和低延迟等特点。 这些特点使得Spark的运算效率大大高于传统的分布式计算系统Hadoop。相对而言,MapReduce比较适合离线数据的处理, 但随着业务场景的发展,实时查询和迭代计算的需求逐渐增多,Spark则可以提供更好的支持。 同时Spark还具有高容错性和高可伸缩性,可以将Spark部署在大量的廉价的设备之上,形成大规模的集群。
Spark最初由加州大学伯克利分校的AMPLab实验室开发,目前正迅速被用户接受,并应用于企业生产中。它 于2010年正式开源,并且在2013年成为Apache的基金项目, 2014年它成为Apache基金的顶级开源项 目,整个过程只用了五年时间。如此短时间的巨大成功,不得不让人惊叹。
2009年:Spark诞生
2010年:正式成为开源项目
2013年:成为Apache基金项目
2014年2月:成为Apache顶级基金项目
2014年4月:大数据公司MapR加入Spark阵营,Apache Mahout放弃MapReduce,使用Spark作为 计算引擎。
2014年5月:Spark1.0.0发布
2014年7月:Hive on Spark项目启动
2016年7月:Spark2.0.0发布
2018年11月:Spark2.4.0发布
到目前为止,AMPLab和DATABRICKS负责着整个项目的开发和维护,同时,有很多的开源爱好者也积极地 加入到Spark的更新和维护之中。
9.1.2 Spark的社区活动
Spark对于社区活动十分重视,拥有规范的组织,经常会定期或者不定期地举行相关讨论会议。Spark会
议可以分为两种,一种叫做Spark
Summit,拥有巨大的影响力,是全世界的Spark项目顶尖技术人员峰会。
目前为止,已经在2013年至2017年于旧金山连续召开了五届Summit峰会。2018年后,Spark
Summit升级为Spark+AI
summit,于3月1日如期在旧金山举行,更多峰会信息可参考Spark Summit官方
网站:http://spark-summit.org/
。
2014年,在Spark Summit峰会参与者中,除了UC Berkley和Databricks之外,还有许多最早尝试Spark进 行大规模数据分析的企业,包括云计算的领先者亚马逊公司、全球最大的流媒体音乐网站Spotify,著名大数据平台MapR,以及众多的大型企 业IBM、Intel、SAP等。
除了Spark Summit会议之外,Spark社区还会不定期地召开小规模的Meetup会议,这种会议有可能在世界 各地举行。在中国, Meetup会议已经举行了多次,参会人员包括来自Intel中国研究院、TalkingData、淘宝、Databricks、微软亚洲研究院的工程师们。
9.1.3 Spark和Hadoop的比较
首先,我们需要了解一下spark与Hadoop的关系。准确地说,Spark是一个分布式计算框架,而Hadoo则更像是 为分布式计算提供服务的基础设施,Hadoop中不仅包含一个计算框架MapReduce,同时也包含分布式的文 件系统HDFS,以及其他的Hadoop项目,比如Hbase、Hive等。因此,Spark可以看作是MapReduce的一种可替代方案, 它并不是和Hadoop同一级别的项目。同时,Spark还兼容HDFS、Hive的分布式存储层,可以将其融入到Hadoop的 生态环境中。因此,如果你有一个安装好的Hadoop集群,那么就可以在这个基础上直接部署Spark了。更多 关于Spark的介绍请参考@karau2015learning。
说到这里,我们可以发现,Spark与Hadoop的比较这种说法是不合适的,真正具有可比性的 是Hadoop中的MapReduce计算框架。那么Spark与MapReduce相比到底具有哪些优势呢?
中间输出结果上的优势
对于MapReduce计算框架,中间计算结果会输出在计算机的硬盘上,当需要的时候再进行调用。由于 需要考虑任务管道承接的问题,当一些查询翻译到MapReduce任务时,往往会产生多个Stage,而这些 串联的Stage又依赖于底层文件系统(如HDFS)来存储每一个Stage的输出结果,产生了较高的延迟。 Spark将执行模型抽象为通用的有向无环图执行计划(DAG),这可以将多Stage的任务串联或者并行执 行,而无须将Stage中间结果输出到HDFS中。类似的引擎包括Dryad、Tez。由于Spark的中间输出结果 无需落地,其计算耗时也远远低于MapReduce。
数据格式和内存布局
由于MapReduce Schema on Read处理方式会引起较大的处理开销,Spark抽象出分布式内存存储结构弹 性分布式数据集RDD,进行数据的存储。RDD能支持粗粒度写操作,但对于读取操作,RDD可以精确到每 条记录,这使得RDD可以用来作为分布式索引。Spark的特性是能够控制数据在不同节点上的分区,用 户可以自定义分区策略,如Hash分区等。Shark和Spark SQL在Spark的基础之上实现了列存储和列存储 压缩。
执行策略
MapReduce在数据Shuffle之前花费了大量的时间来排序,Spark则可减轻上述问题带来的开销。因 为Spark任务在Shuffle中不是所有情景都需要排序,所以支持基于Hash的分布式聚合,调度中采用更 为通用的任务执行计划图(DAG),每一轮次的输出结果在内存缓存。
任务调度的开销
传统的MapReduce系统,如Hadoop,是为了运行长达数小时的批量作业而设计的,在某些极端情况下, 提交一个任务的延迟非常高。Spark采用了事件驱动的类库AKKA来启动任务,通过线程池复用线程来避 免进程或线程启动和切换开销。
虽然Spark与Hadoop MapReduce相比存在许多优势,但由于两者有不同的设计需求和应用场景,MapReduce是有存在的意义的。由于Hadoop MapReduce的设计简单,维护更加容易,Hadoop MapReduce更适合做非常廉价的糙快稳的工作,比如对大量数据进行查找,对数据列进行提取,进行Hive查询等。Spark的运行速度虽然更快,但其硬件上的依赖更强,需要大量内存、大量带宽的使用,这使得Spark的应用过程中对硬件的需求更高。
9.1.4 Spark的特点
拥有高效的数据流水线
除了传统的MapReduce操作之外,Spark还可以支持SQL语句查询、机器学习图模型算 法(???)等等,用户可以在一个工作的流程中将这些功能完美的组合起来。
强大的快速处理功能
Spark是一款轻量级的软件系统,第一代spark核心程序只有4万行代码。Spark为处理大数据而生,最 重要的一个特点就是将结果缓存在内存中,从而达到提高计算效率,减少计算时间的目的。
可用性
Spark提供了丰富的Scala, Java,Python API及交互式Shell来提高软件的可用性。使用者可以 在Spark系统中像书写单机程序一样来书写分布式计算程序,轻松的利用spark系统搭建的分布式计算 平台来处理海量的数据。
容错性
Spark系统通过checkpoint实现系统的容错功能。checkpoint主要有两种方式,一种是checkpoint data,一种是logging the updates。用户可以自主决定采用哪种方式来实现容错功能。
兼容性
Spark的兼容性使其可以在很多平台上使用。Spark与HDFS、HBase、hive等兼容,除了可以运行在YARN 等分布式集群系统之外,还可以读取现存的任何的Hadoop数据,可以在任何Hadoop数据源上运行, 如Hive、HBase等等。Spark也可以运行在云平台上,比如国外的亚马逊EC2, 国内的阿里云等云端平台。Spark也可以单独运行,其有自己的框架,也可以存储数据。
9.1.5 Spark生态:BDAS
从Spark产生到现在,已经发展成为包括许多子项目的分布式计算平台。伯克利实验室将整个Spark的生 态系统称为伯克利数据分析栈,也就是常说的BDAS。其中,Spark是整个系统的核心。与此同时,BDAS还包 含了结构化数据查询引擎Spark SQL和Shark,提供机器学习功能的MLlib,流计算系统Spark Streaming,并行图计算系统GraphX等等。这些项目为Spark系统提供了更加丰富的计算范式,使 得Spark的功能更加强大。
BDAS系统包含如下内容:
Spark
Spark是一个快速通用的分布式数据处理系统,不仅实现了Hadoop系统的MapReduce算子map 函数 和reduce函数,还提供了其他的算子,例如filter、join、groupByKey等。弹性分布式数据集(RDD) 处理分布式数据的核心,实现了重要的应用任务调度、RPC、序列化和压缩功能,并为上层组件提供 了API。Spark底层采用Scala语言书写而成,提供给使用者与Scala类似的程序接口。对于Scala语言 的使用,请参考@odersky2008programming和@ryza2015advanced。
Shark
Shark是spark生态系统中的数据仓库,构建在Hive的基础之上。目前shark已经终止了开发。
Spark SQL
Spark SQL为用户提供了spark系统中的数据查询功能。Spark SQL使用Catalyst做为查询解析和优化器, 并且Spark SQL在底层使用Spark作为执行引擎来实现SQL查询操作,性能普遍比Hive快2-10倍。同时,用 户可以在Spark上直接编写SQL代码,这相当于为Spark提供了一套强大的SQL算子。同时Spark SQL还不断 的兼容不同的Hadoop项目(如HDFS、Hive等),为它的发展提供了广阔空间。
Spark Streaming
Spark Streaming是一种构建在Spark上的实时的计算框架,它为Spark提供了处理大规模流数据的能 力。Spark Streaming的优势在于:能运行在超过100以上的结点上,并达到秒级延迟;使用Spark作为 执行引擎,具有比较高的效率和容错性;可以集成Spark的批处理和交互查询功能,为实现复杂的算法 提供简单的接口。
GraphX
GraphX是基于BSP模型的图计算项目,在Spark上封装了类似Pregel的接口,进行大规模的同步全 局的图计算,当用户进行多轮迭代的时候,基于Spark内存计算的GraphX优势更为明显。
MLlib/ML
MLlib是Spark的可扩展机器学习库。它的目标是使实用的机器学习算法可扩展并容易使用。 它提供包括机器学习算法、特征工程、管道、存储和其他实用工具。MLlib适用于Spark的API, 并且可以与Python(自Spark 0.9起)和R库(自Spark 1.5起)中的NumPy互操作。 可以使用任何Hadoop数据源(例如HDFS,HBase或本地文件),从而轻松插入Hadoop工作流。 spark.ml是Spark 1.2中引入的新程序包,旨在提供统一的高级API集,以帮助用户创建和调整实用 的机器学习管道,它目前是Alpha组件。随着spark.ml的开发,也将继续支持spark.mllib并向其 中添加功能。
9.2 Spark工作原理介绍
9.2.1 Spark架构
Spark的架构采用了经典的Master-Slave通用基础框架。其中Master是集群中的含有Master进程的节点, 而Slave是集群中含有Worker进程的节点。Master是整个集群的控制器,负责了整个集群的运 行;Worker相当于是集群的计算节点,接收来自主节点的命令同时进行状态汇报;Executor负责执行具 体的任务;Client是用户的客户端,作用是提交应用,而Driver则是负责控制应用的执行。
Spark分布式集群安装好之后,需要在主节点和子节点上分别启动Master进程以及Worker进程,从而控制 整个集群的运行。在一个Spark任务执行的过程中, Driver程序是任务逻辑执行的起点,负责了整个作 业的调度,而Worker则是用来管理计算节点和创建Executor然后处理任务。在任务的执行阶 段,Driver会将任务和任务所依赖的file和jar序列化之后传递给相应的Worker节点,同时Executor对相 应的数据分区的任务进行处理。
9.2.2 Spark组件介绍
下面介绍一下Spark架构中的基本组成部分。
Client:客户端进程,负责提交作业信息到Master。
Master:负责接收Client提交的作业,管理Worker,并命令Worker启动Driver和Executor。
ClusterManager:在Standalone模式中的Master(主节点),控制着整个集群,监控Worker的工 作情况。
Worker:子节点,负责控制具体的计算节点,启动Driver和Executor完成任务。在YARN模式中称 为NodeManager。
Driver:一个Spark作业的主进程,运行Application的main()函数并创建SparkContext,负责作业的解析 、生成Stage并调度Task到Executor上。
Executor:执行器,在worker node上执行任务的组件、用于启动线程池运行任务。每 个Application拥有独立的一组Executors。
SparkContext:整个应用的上下文,控制应用的生命周期,Spark应用程序的执行过程中起着主导作用,它负责 与程序和spark集群进行交互,包括申请集群资源、创建RDD等。
RDD:Spark的基本计算单元,一组RDD可形成执行的有向无环图RDD Graph。
DAG Scheduler:根据应用(Application)构建基于Stage的DAG,实现将Spark作业分解成一到多个Stage, 每个Stage根据RDD的Partition个数决定Task的个数,并提交Stage给TaskScheduler。
TaskScheduler:将任务(Task)分发给Executor执行。
SparkEnv:线程级别的上下文,存储运行时的重要组件的引用。
SparkEnv内创建并包含如下一些重要组件的引用。
MapOutPutTracker:负责Shuffle元信息的存储。
BroadcastManager:负责广播变量的控制与元信息的存储。
BlockManager:负责存储管理、创建和查找块。
MetricsSystem:监控运行时性能指标信息。
SparkConf:负责存储配置信息。
SparkContext是Spark的主要入口点,如果把Spark集群当作服务端,那Spark Driver就是客户端,SparkContext则是客户端的核心, 创建SparkContext的语句如下:
val conf = new SparkConf().setMaster("master").setAppName("appName")
val sc = new SparkContext(conf)
SparkContext的初始化需要一个SparkConf对象,SparkConf包含了Spark集群配置的各种参数。其中setMaster主要是设定连接主节点,
如果参数是local
,则在本地用单线程运行spark,如果是
local[4]
,则在本地用4核运行。setAppName则是给出指定的Spark应用一个名称。
9.2.3 Spark工作流程
用户在Client中提交了任务之后,根据Driver的配置模式,Driver在客户端开始运行或Master会找到一个Worker然后启动Driver, Driver会根据要执行的任务向Master申请资源,之后将任务转化为RDD Graph,再由DAGScheduler(功能:将Spark作业分解成一 到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Task set放到TaskScheduler中)将RDD Graph转化为Stage的有向无环图提交 给TaskScheduler,由TaskScheduler提交任务给Executor执行。在任务执行的过程中,其他组件协同工 作,确保整个应用顺利执行。
9.2.4 启动Spark应用程序
9.2.4.1 spark-submit
Spark的bin目录中的spark-submit脚本用于启动Spark集群上的应用程序。 它可以通过统一的界面使用所有受支持的Spark集群管理器,因此无需特别为每个应用程序进行配置。
如果用户代码依赖于其他项目,则需要捆绑应用程序的依赖项,以便将代码分发到Spark集群。 为此,需要创建一个包含用户代码及其依赖项的程序集jar(或“uber” jar)。 sbt和Maven都有程序集插件。 创建程序集jar时,将Spark和Hadoop列为提供的依赖项,因为 这些不需要被捆绑,它们是由集群管理器在运行时提供的。 组装好jar后,可以在传递jar时调用bin / spark-submit脚本。
对于Python,可以使用spark-submit的--py-files
参数添加.py,.zip或.egg文件,以与用户应用程序一起分发。 如果依赖于多个Python文件,建议将它们打包为.zip或.egg。
捆绑用户应用程序后,可以使用bin / spark-submit脚本启动它。 该脚本负责使用Spark及其依赖项设置类路径,并且可以支持不同的集群管理器和Spark支持的部署模式:
spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
一些常用的选项是:
--class
:应用程序的入口点(例如org.apache.spark.examples.SparkPi)
--master
:群集的主URL(例如spark://23.195.26.187:7077)
--deploy-mode
:将驱动程序部署在工作节点(群集)上还是本地部署为外部客户端(客户端)(默认值:客户端)
--conf
:key=value格式的任意Spark配置属性。 对于包含空格的值,将“ key = value”用引号引起来(如图所示)。
application-jar
:包含您的应用程序和所有依赖项的捆绑jar的路径。 该URL必须在群集内部全局可见,例如,所有节点上都存在hdfs://路径或file://路径。
application-arguments
:传递给您的主类的main方法的参数(如果有)
常见的部署策略是从与工作计算机物理上位于同一位置的网关计算机(例如,独立EC2群集中的主节点)提交应用程序。 在这种设置中,客户端模式是合适的。 在客户端模式下,驱动程序直接在spark-submit进程内启动,该进程充当集群的客户端。 应用程序的输入和输出已附加到控制台。 因此,此模式特别适用于涉及REPL(例如Spark Shell)的应用。 或者,如果用户应用程序是从远离工作机的计算机(例如,在笔记本电脑上本地)提交的,则通常使用cluster模式来最大程度地减少驱动程序和执行程序之间的网络延迟。 当前,standalone模式不支持Python应用程序的cluster模式。 如在YARN cluster上运行:
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \ # can be client for client mode
--executor-memory 20G \
--num-executors 50 \
/path/to/examples.jar \
1000
传递给Spark的master URL可以采用以下格式之一:
Master URL | 含义 |
---|---|
local | 使用一个工作线程在本地运行Spark(即完全没有并行性) |
local[K] | 使用K个辅助线程在本地运行Spark(最好将其设置为计算机上的内核数) |
local[K,F] | 使用K个工作线程和F个maxFailures在本地运行Spark(有关此变量的说明,请参见spark.task.maxFailures) |
local[*] | 在本地运行Spark,其工作线程数与计算机上的逻辑核心数量相同 |
local[*,F] | 在本地运行Spark,其工作线程数与计算机和F maxFailures上的逻辑核心数量相同 |
spark://HOST:PORT | 连接到给定的Spark独立集群主服务器。该端口必须是您的主服务器配置为使用的端口,默认情况下为7077 |
spark://HOST1:PORT1,HOST2:PORT2 | 使用Zookeeper与备用主服务器连接到给定的Spark独立集群。该列表必须具有使用Zookeeper设置的高可用性群集中的所有主控主机。该端口必须是每个主服务器配置为使用的端口,默认情况下为7077 |
mesos://HOST:PORT | 连接到给定的Mesos群集。该端口必须是您配置使用的端口,默认情况下为5050。或者,对于使用ZooKeeper的Mesos群集,请使用mesos:// zk://…。要使用–deploy-mode群集提交,应将HOST:PORT配置为连接到MesosClusterDispatcher |
yarn | 根据–deploy-mode的值,以客户端或群集模式连接到YARN群集。将基于HADOOP_CONF_DIR或YARN_CONF_DIR变量找到群集位置 |
k8s://HOST:PORT | 以集群模式连接到Kubernetes集群。当前不支持客户端模式,将来的版本将支持该模式。主机和端口是指Kubernetes API服务器。默认情况下,它使用TLS连接。为了强制它使用不安全的连接,可以使用k8s:// http:// HOST:PORT |
9.2.4.2 使用R运行Spark
从Spark 1.4开始,Spark还提供了实验性的R API(仅包含DataFrames API)。 要在R解释器中交互式运行Spark,使用sparkR。SparkR是R软件包,它提供了轻量级的前端以使用R中的Spark。在Spark 2.4.6中,SparkR提供了分布式数据帧实现,支持诸如选择,过滤,聚合等操作(类似于R data frames, dplyr),适用于大型数据集。 SparkR还支持使用MLlib进行分布式机器学习。 实现命令代码:
sparkR --master local[2]
R中还提供了示例应用程序,例如:
spark-submit examples/src/main/r/dataframe.R
9.2.4.3 使用Python运行Spark
也可以通过启动PySpark shell使用Python运行Spark。直接运行pyspark命令时,默认的Python版本是2.7,设置PYSPARK_PYTHON
变量以在运行pyspark命令时选择适当的Python版本:
PYSPARK_PYTHON=python3.7 pyspark
在Python中交互运行Spark,这时Spark被当做Python的一个简单的模块来实现,但是默认情况下PySpark不在sys.path上,这意味着它不能直接用作常规库。
可以通过将pyspark符号链接到站点包中,或在运行时将pyspark添加到sys.path中来解决此问题,使用findspark
就能够实现后者。
要使用PySpark,只需先在Python中调用findspark
:
import findspark
findspark.init('/usr/lib/spark-current/')
其中’/usr/lib/spark-current/’是SPARK_HOME的环境变量,可通过命令echo $SPARK_HOME
寻找此路径
然后即可导入pyspark
模块
import pyspark
9.3 Spark的数据集和并行化
Spark中有两个可用的API:
RDDs, accumulators, and broadcast variables
Spark SQL, Datasets, and DataFrames
RDDs(resilient distributed dataset)是Spark的核心API,它是跨集群节点划分的元素的集合,可以进行并行操作。Spark SQL是用于结构化数据处理的Spark模块,是比RDD更新的API。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。两个API是并存的,旧的API接口RDD一直处于维护状态,新的API接口Spark SQL持续研发新特征。两个API是可以同时使用的,而且有时可以实现相互的转化。 本章主要介绍RDD的相关内容。
从总体上讲,每个Spark应用程序都包含运行用户主要功能的驱动程序,以及在集群上执行各种并行操作。 Spark提供的主要抽象是弹性分布式数据集(RDD),它是跨集群节点划分的元素的集合,可以执行并行操作。 用户可以通过对Hadoop文件系统中的文件进行转换来创建RDD,还可以要求Spark将RDD保留在内存中,以使其能够在并行操作中有效地重用,RDD可以自动从节点故障中恢复。 Spark中的第二个抽象是可以在并行操作中使用的共享变量(shared variables)。默认情况下,当Spark作为一组任务在不同节点上并行运行一个函数时,它会将函数中使用的每个变量的副本传送给每个任务,有时也需要在任务之间或任务与驱动程序之间共享变量。 Spark支持两种类型的共享变量:
广播变量(broadcast variables),可用于在所有节点上的内存中缓存值
累加器(accumulators),它们是仅“加”到的变量,例如计数器和总和。
9.3.1 RDD
在Spark集群中,有一个非常重要的核心:分布式数据架构,也就是弹性分布式数据集(resilient distributed dataset,RDD)。RDD可以在集群的多台机器上进行数据分区。最重要的一点,它可以通过 对不同台机器上不同数据分区的控制,减少集群机器之间数据重排(data shuffling)的数量。Spark提 供了“partitionBy”运算符,能够通过集群中多台机器之间对原始RDD进行数据再分配来创建一个新 的RDD。RDD是Spark的核心数据结构,通过RDD的依赖关系形成Spark的调度顺序。通过对RDD的操作形成 整个Spark程序。对于详细的Spark操作,请参考@yadav2015spark。
对于MapReduce来说,HDFS上存储的数据就是它的输入。而RDD则可以看作是Spark的输入,作为Spark输入的 RDD有以下五大特征: 1)分区性(partition):RDD数据可以被分为几个分区(子集),切分后的数据能够进行并行计算,是数据集的原子组成部分。 2)计算函数(compute):RDD的每个分区上面都会有函数,其作用是实现RDD之间分区的转换。 3)依赖性(dependency):RDD通过特定的转化操作,可以得到新的RDD,新的RDD和旧的RDD之间存在依赖关系,这种依赖关 系保证了部分数据丢失时可以特定的转化操作重新生成。 4)优先位置(perferred locations):这是一个可选属性,在有些子RDD中并没有实现。RDD计算时会存取每个Partition 的优先位置(preferred location)。按照“移动数据不如移动计算”的理念,Spark在进行任务调度时,会尽可能地将计 算任务分配到其所要处理数据块的存储位置。 5)分区策略:这也是一个可选属性,描述数据分区模式和数据存放的位置。如果RDD里面存的数据是key-value形式,则可以传递 一个自定义的Partitioner进行重新分区,例如这里自定义的Partitioner是基于key进行分区,那则会将不同RDD里面的相 同key的数据放到同一个partition里面。类似于MapReduce中的Partitioner接口。
9.3.1.1 RDD的两种创建方式
从Hadoop文件系统(或与Hadoop兼容的其他持久化存储系统,如Hive、Cassandra、Hbase)输入 (如HDFS)创建。
从父RDD转换得到新的RDD。
9.3.1.2 RDD的两种操作算子
对于RDD可以有两种计算操作算子:Transformation(转换)与Action(执行),Transformation指定了RDD之间 的依赖关系,Action则指定了RDD操作最后的输出形式。
Transformation(变换)
Transformation操作是延迟计算的,也就是说从一个RDD转换生成另一个RDD的转换操作不是马上执行, 需要等到有Action操作时,才真正触发运算。
Action(行动)
Action算子会触发Spark提交应用(Application),并将数据输出到Spark系统。
常见的RDD转换(Transformation)和执行(Actions)操作如下:
Transformation | 说明 |
---|---|
map(func) | 参数是函数func,函数应用于RDD每一个元素,返回值是新的RDD |
filter(func) | 参数是函数func,选取数据集中使得函数func返回值为True的元素,返回值是新的RDD |
flatMap(func) | 参数是函数func,函数应用于RDD每一个元素,将元素数据进行拆分,每个元素可以被映射到多个输出项,返回值是新的RDD |
distinct() | 没有参数,将RDD里的元素进行去重操作 |
union() | 参数是RDD,返回包含两个RDD所有元素的新RDD |
intersection() | 参数是RDD,返回两个RDD的共同元素 |
cartesian() | 参数是RDD,求两个RDD的笛卡儿积 |
coalesce(numPartitions) | 将RDD分区的数目合并为numPartitons个 |
Action | 说明 |
---|---|
collect() | 以数组的形式,返回RDD所有元素 |
count() | 返回RDD里元素的个数 |
countByValue() | 各元素在RDD中出现次数 |
reduce(func) | 通过函数func聚集数据集中的所有元素,并行整合所有RDD数据,例如求和操作 |
aggregate(0)(seqOp,combop) | 和reduce功能一样,但是返回的RDD数据类型和原RDD不一样 |
foreach(func) | 对RDD每个元素都是使用特定函数func |
saveAsTextFile(path) | 将数据集的元素作为一个文本文件保存至文件系统的给定目录path中 |
saveAsSequenceFile(path) | 将数据集的元素以sequence的形式保存至文件系统的给定目录path中 |
9.3.1.3 RDD的重要内部属性
分区列表。
计算每个分片的函数。
对父RDD的依赖列表。
对Key-Value对数据类型RDD的分区器,控制分区策略和分区数。
每个数据分区的地址列表(如HDFS上的数据块的地址)
在Spark的执行过程中,RDD经历一个个的Transfomation算子之后,最后通过Action算子进行触发操作。 逻辑上每经历一次变换,就会将RDD转换为一个新的RDD,RDD之间通过Lineage产生依赖关系,这个关系 在容错中有很重要的作用。变换的输入和输出都是RDD。RDD会被划分成很多的分区分布到集群的多个节 点中。分区是个逻辑概念,变换前后的新旧分区在物理上可能是同一块内存存储。这是很重要的优化, 以防止函数式数据不变性(immutable)导致的内存需求无限扩张。有些RDD是计算的中间结果,其分区 并不一定有相应的内存或磁盘数据与之对应,如果要迭代使用数据,可以调cache()函数缓存数据。
9.3.1.4 RDD的工作特点
在物理上,RDD对象实质上是一个元数据结构,存储着Block、Node等的映射关系,以及其他的元数据信 息。一个RDD就是一组分区,在物理数据存储上,RDD的每个分区对应的就是一个Block,Block可以存储 在内存,当内存不够时可以存储到磁盘上。
每个Block中存储着RDD所有数据项的一个子集,暴露给用户的可以是一个Block的迭代器(例如,用户可 以通过mapPartitions获得分区迭代器进行操作),也可以就是一个数据项(例如,通过map函数对每个 数据项并行计算)。
9.3.1.5 Spark Lazy Evaluation
“LAZY”一词本身表示“非实时”。Spark中的所有转换都是惰性的,因为它们不会立即计算出结果。取而代之的是,他们只记得应用于某些基本数据集(例如文件)的转换。仅当动作要求将结果返回给驱动程序时才计算转换。 这是一种聪明的设计。如果Spark被设置为实时工作,那么用户每上传一个计算命令,Spark就要与每一个worker节点进行大量的通信和配置。但Spark Lazy Evaluation可以使很多计算被捆绑起来一起传递给worker节点,这样多次通信就可以变成一次通信,所有转换都可以组合在一起成为一个转换并一起执行。这种设计可以节省时间并减少空间复杂度,使该系统以更少的资源更有效地工作,并且还减少了查询数量,使Spark可以更高效地运行。 Spark Lazy Evaluation是Spark RDD操作中的关键构建块。
9.3.2 Spark算子的分类及作用
算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作,Spark的所有功能都是通过具体的算子 来实现的。
输入:在Spark程序运行中,数据从外部数据空间(如分布式存 储:textFile读取HDFS等,parallelize方法输入Scala集合或数据)输入Spark,数据进入Spark运行 时数据空间,转化为Spark中的数据块,通过BlockManager进行管理。
运行:在Spark数据输入形成RDD后便可以通过变换算子,如fliter等,对数据进行操作并将RDD转 化为新的RDD,通过Action算子,触发Spark提交作业。如果数据需要复用,可以通过Cache算子,将数 据缓存到内存。
输出:程序运行结束数据会输出Spark运行时间,存储到分布式存储中(如saveAsTextFile输出 到HDFS),或Scala数据或集合中(collect输出到Scala集合,count返回Scala int型数据)。
Spark的核心数据模型是RDD,但RDD是个抽象类,具体由各子类实现,如MappedRDD、ShuffledRDD等子 类。Spark将常用的大数据操作都转化成为RDD的子类。
Spark算子大致可以分为三大类算子。
Value数据类型的Transformation算子,这种变换并不触发提交作业,针对处理的数据项是Value型的 数据。
Key-Value数据类型的Transfromation算子,这种变换并不触发提交作业,针对处理的数据项 是Key-Value型的数据对。
Action算子,这类算子会触发SparkContext提交Application。
9.3.3 创建一个SparkContext对象
Spark程序必须做的第一件事是创建一个SparkContext对象,该对象告诉Spark如何访问集群。 要创建SparkContext,首先需要构建一个SparkConf对象,该对象包含有关应用程序的信息 使用pyspark创建过程如下:
import findspark
findspark.init("/usr/lib/spark-current")
import pyspark
conf = pyspark.SparkConf().setAppName("My First Spark RDD APP").setMaster("local") # “yarn”
sc = pyspark.SparkContext(conf=conf)
不允许同时创建多个SparkContext,在创建下一个SparkContext之前必须先停止现有的SparkContext 停止SparkContext命令代码:
sc.stop()
或在创建SparkContext时使用容错功能,避免在创建时因已有SparkContext而产生创建错误:
sc = pyspark.SparkContext.getOrCreate()
9.3.4 并行化集合
通过在驱动程序中现有的可迭代对象或集合上调用SparkContext的parallelize方法来创建并行集合。 复制集合的元素以形成可以并行操作的分布式数据集。 例如,以下是创建包含数字1到5的并行化集合的方法:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
可以同时创建多个并行化数据集,如可以同时创建data2:
data2 = [11, 21, 31, 41, 51]
distData2 = sc.parallelize(data2)
创建后,可以并行处理分布式数据集(distData)。 例如,我们可以调用命令distData.reduce(lambda a,b:a + b)
来添加列表中的元素。
并行集合的一个重要参数是将数据集切入的分区数。 Spark将为集群的每个分区运行一个任务。 通常,群集中的每个CPU都需要2-4个分区。 通常,Spark会尝试根据集群自动设置分区数。 但是,也可以通过将其作为第二个参数传递来进行手动设置(例如sc.parallelize(data,10)
)。
9.3.5 使用RDD的外部数据集
PySpark可以从Hadoop支持的任何存储源创建分布式数据集,包括您的本地文件系统,HDFS,Cassandra,HBase,Amazon S3等。Spark支持文本文件,SequenceFiles和任何其他Hadoop InputFormat。 可以使用SparkContext的textFile方法创建文本文件RDD。 此方法获取文件的URI(计算机上的本地路径,或hdfs://,s3a://等URI),并将其读取为行的集合。
licenseFile = sc.textFile("/opt/apps/ecm/service/spark/2.4.4/package/spark-2.4.4-bin-hadoop2.7/licenses/LICENSE-vis.txt")
licenseFile
如果在本地文件系统上使用路径,则还必须在工作节点上的相同路径上访问该文件。将文件复制到所有工作服务器,或者使用网络安装的共享文件系统。 Spark的所有基于文件的输入方法(包括textFile)都支持在目录,压缩文件和通配符上运行。例如,您可以使用textFile(“/ my / directory”),textFile(" / my / directory / .txt“)和textFile(”/ my / directory / .gz")。 textFile方法还带有一个可选的第二个参数,用于控制文件的分区数。默认情况下,Spark为文件的每个块创建一个分区(HDFS中的块默认为128MB),但是也可以通过传递更大的值来请求更大数量的分区。要注意的是,分区不能少于块。
除文本文件外,Spark的Python API还支持其他几种数据格式:
SparkContext.wholeTextFiles 可以读取包含多个小文本文件的目录,并将每个小文本文件作为(文件名,内容)对返回。 这与textFile相反,后者将在每个文件的每一行返回一条记录。
RDD.saveAsPickleFile和SparkContext.pickleFile支持以包含pickled Python对象的简单格式保存RDD。批处理用于pickle序列化,默认批处理大小为10。
SequenceFile和Hadoop输入/输出格式
要注意的是,这个功能当前标记为“Experimental”,仅供高级用户使用,将来可能会替换为基于Spark SQL的读/写支持,在这种情况下,Spark SQL将是首选方法。
9.3.6 RDD数据集缓存
Spark中最重要的功能之一是跨操作在内存中缓存数据集。
当缓存RDD时,每个节点都会将其计算的所有分区存储在内存中,并在该数据集(或从该数据集派生的数据集)上的其他操作中重用它们。
这样可以使以后的操作更快(通常快10倍以上)。缓存是用于迭代算法和快速交互使用的关键工具。
可以使用persist()
或cache()
方法将一个RDD进行缓存。第一次在操作中对其进行计算时,它将被保存在节点上的内存中。
Spark的缓存是容错的,如果RDD的任何分区丢失,它将使用最初创建它的转换自动重新计算。
缓存命令实例如下:
lineLengths.persist()
lineLengths.cache() # same as persist() but use default storage level
9.3.6.1 RDD不同的存储级别
每个需要缓存的RDD可以使用不同的存储级别进行存储,例如,允许您将数据集缓存在磁盘上,缓存在内存中,但作为序列化的Java对象(以节省空间)在节点之间复制。通过将StorageLevel对象(Scala,Java,Python)传递给persist()来设置这些级别。 cache()方法是使用默认存储级别StorageLevel.MEMORY_ONLY(将反序列化的对象存储在内存中)的简写。完整的存储级别集是:
Storage Level | 说明 |
---|---|
MEMORY_ONLY | 将RDD作为反序列化的Java对象存储在JVM中。如果RDD不能容纳在内存中,则某些分区将不会被缓存,并且每次需要时都会即时重新计算。这是默认级别。 |
MEMORY_AND_DISK | 将RDD作为反序列化的Java对象存储在JVM中。如果RDD不能容纳在内存中,存储磁盘上不适合的分区,并在需要时从那里读取它们。 |
MEMORY_ONLY_SER(Java and Scala) | 将RDD存储为序列化的Java对象(每个分区一个字节数组)。通常,这比反序列化的对象更节省空间,尤其是在使用快速序列化程序时,但读取时会占用更多CPU。 |
MEMORY_AND_DISK_SER(Java and Scala) | 与MEMORY_ONLY_SER类似,但是将内存中不适合的分区溢出到磁盘上,而不是在需要时即时对其进行重新计算。 |
DISK_ONLY | 仅将RDD分区存储在磁盘上。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | 与上面的级别相同,但是在两个群集节点上复制每个分区。 |
OFF_HEAP (experimental) | 与MEMORY_ONLY_SER类似,但是将数据存储在堆外存储器中。这需要启用堆外内存。 |
需要注意的是,在Python中,存储的对象将始终使用Pickle库进行序列化,因此,是否选择序列化级别都无关紧要。 Python中的可用存储级别包括MEMORY_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK,MEMORY_AND_DISK_2,DISK_ONLY和DISK_ONLY_2。 即使用户没有将数据缓存,Spark也会自动将一些中间数据缓存在随机操作中(例如reduceByKey)。 这样做是为了避免在混洗期间节点发生故障时重新计算整个输入。 用户如果打算重复使用,则应该对结果的RDD进行缓存。
9.3.6.2 RDD取消缓存
Spark自动监视每个节点上的缓存使用情况,并以最近最少使用(LRU)的方式丢弃旧的数据分区。
如果想手动删除RDD而不是等待它脱离缓存,可以使用RDD.unpersist()
方法。命令示例如下:
lineLengths.unpersist()
9.4 Spark数据结构
MLlib支持存储在一台计算机上的局部向量和矩阵,以及由一个或多个RDD支持的分布式矩阵。 局部向量和局部矩阵是充当公共接口的简单数据模型。 基本的线性代数运算由Breeze提供。 监督学习中使用的训练示例在MLlib中称为“标记点”。
9.4.1 Local vector局部向量
局部向量具有存储在单个计算机上的整数类型索引和基于0的索引以及双精度类型的值。 MLlib支持两种类型的局部向量:密集和稀疏。 密集向量由表示其输入值的双精度数组支持,而稀疏向量由两个并行数组支持:索引和值。 例如,向量(1.0,0.0,3.0)可以用密集格式表示为[1.0,0.0,3.0],也可以用稀疏格式表示为(3,[0,2],[1.0,3.0]),其中3是向量的大小。 MLlib将NumPy的数组和Python的列表(例如[1,2,3])识别为密集向量;将MLlib的SparseVector和SciPy的仅包含一列的csc_matrix识别为稀疏向量。 创建命令如下:
import numpy as np
import scipy.sparse as sps
from pyspark.mllib.linalg import Vectors
# Use a NumPy array as a dense vector.
dv1 = np.array([1.0, 0.0, 3.0])
# Use a Python list as a dense vector.
dv2 = [1.0, 0.0, 3.0]
# Create a SparseVector.
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
# Use a single-column SciPy csc_matrix as a sparse vector.
sv2 = sps.csc_matrix((np.array([1.0, 3.0]),
np.array([0, 2]),
np.array([0, 2])), shape=(3, 1))
9.4.2 Local matrix局部矩阵
局部矩阵具有整数类型的行和列索引以及双重类型的值,它们存储在单个计算机上。 MLlib支持密集矩阵(其条目值以列优先顺序存储在单个double数组中)和稀疏矩阵(其非零条目值以压缩列稀疏列CSC格式以列优先顺序存储)。
局部矩阵的基类是Matrix,我们提供两种实现:DenseMatrix
和SparseMatrix
。 我们建议使用在矩阵中实现的factory methods 来创建本地矩阵。 要注意的是,MLlib中的本地矩阵以列优先顺序存储。
实现命令如下:
from pyspark.mllib.linalg import Matrix, Matrices
# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
dm2 = Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])
print(dm2)
# Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])
print(sm)
9.4.3 Labeled Points标记点
标记点是与标记/响应相关联的局部矢量(密集或稀疏)。 在MLlib中,标记的点用于监督学习算法中。 我们使用双精度来存储标签,因此我们可以在回归和分类中使用带标签的点。 对于二进制分类,标签应为0(负数)或1(正数)。 对于多类分类,标签应为从零开始的类索引:0,1,2,…。标记点使用LabeledPoint
创建。
创建命令如下:
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint
# Create a labeled point with a positive label and a dense feature vector.
pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])
# Create a labeled point with a negative label and a sparse feature vector.
neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))
9.4.4 Sparse data稀疏数据
在实践中,稀疏训练数据非常常见。 MLlib支持阅读以LIBSVM格式存储的训练数据,这是LIBSVM和LIBLINEAR使用的默认格式。 它是一种文本格式,其中每行使用以下格式表示带标签的稀疏特征向量:
label index1:value1 index2:value2 ...
索引是从基于1的并按升序排列的。 加载后,要素索引将转换为基于0的索引。
使用MLUtils.loadLibSVMFile
读取以LIBSVM格式存储的培训示例:
from pyspark.mllib.util import MLUtils
examples = MLUtils.loadLibSVMFile(sc,
"/opt/apps/ecm/service/spark/2.4.4/package/spark-2.4.4-bin-hadoop2.7/data/mllib/sample_libsvm_data.txt")
print(examples)
9.4.5 Distributed matrix分布式矩阵
分布式矩阵具有长型行和列索引以及双型值,这些值分布式存储在一个或多个RDD中。选择正确的格式来存储大型矩阵和分布式矩阵非常重要,因为将分布式矩阵转换为其他格式可能需要全局改组,这非常昂贵。到目前为止,已经实现了四种类型的分布式矩阵。 基本类型称为RowMatrix。 RowMatrix是面向行的分布式矩阵,没有有意义的行索引,例如特征向量的集合。它由其行的RDD支持,其中每一行都是本地向量。我们假设RowMatrix的列数不是很大,因此可以将单个局部向量合理地传递给驱动程序,也可以使用单个节点对其进行存储/操作。 IndexedRowMatrix与RowMatrix相似,但具有行索引,该行索引可用于标识行和执行联接。 CoordinateMatrix是以坐标列表(COO)格式存储的分布式矩阵,由其条目的RDD支持。 BlockMatrix是由MatrixBlock的RDD支持的分布式矩阵,该矩阵是(Int, Int, Matrix)的元组。 要注意的是,分布式矩阵的基础RDD必须是确定性的,因为我们缓存了矩阵的大小。通常,使用不确定的RDD可能会导致错误。
9.4.5.1 RowMatrix行矩阵
RowMatrix是面向行的分布式矩阵,不包含有意义的行索引,并由其行的RDD支持,其中每行都是局部向量。 由于每一行都由局部矢量表示,因此列数受整数范围的限制,但实际上应该小得多。 可以从向量的RDD中创建RowMatrix:
from pyspark.mllib.linalg.distributed import RowMatrix
# Create an RDD of vectors.
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])
# Create a RowMatrix from an RDD of vectors.
mat = RowMatrix(rows)
# Get its size.
m = mat.numRows() # 4
n = mat.numCols() # 3
# Get the rows as an RDD of vectors again.
rowsRDD = mat.rows
print(m,n,rowsRDD)
9.4.5.2 IndexedRowMatrix
IndexedRowMatrix与RowMatrix相似,但具有有意义的行索引。 它由索引行的RDD支持,因此每一行都由其索引(长型)和局部向量表示。 可以从IndexedRows的RDD创建IndexedRowMatrix,其中IndexedRow是长向量的包装。可以通过删除IndexedRowMatrix的行索引将其转换为RowMatrix。
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
# Create an RDD of indexed rows.
# - This can be done explicitly with the IndexedRow class:
indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
IndexedRow(1, [4, 5, 6]),
IndexedRow(2, [7, 8, 9]),
IndexedRow(3, [10, 11, 12])])
# - or by using (long, vector) tuples:
indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),
(2, [7, 8, 9]), (3, [10, 11, 12])])
9.4.5.3 BlockMatrix 块矩阵
BlockMatrix是由MatrixBlocks的RDD支持的分布式矩阵,其中MatrixBlock是((Int, Int), Matrix)的元组,其中(Int, Int)是块的索引,而Matrix是子 给定索引处的矩阵,其大小为rowsPerBlock x colsPerBlock。 BlockMatrix支持诸如与另一个BlockMatrix相加和相乘的方法。 BlockMatrix还具有一个验证器帮助功能,该功能可用于检查BlockMatrix是否正确设置。 可以从sub-matrix blocks的RDD创建一个BlockMatrix,其中sub-matrix blocks是一个((blockRowIndex, blockColIndex), sub-matrix) 元组。
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix
# Create an RDD of sub-matrix blocks.
blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
# Create a BlockMatrix from an RDD of sub-matrix blocks.
mat = BlockMatrix(blocks, 3, 2)
# Get its size.
m = mat.numRows() # 6
n = mat.numCols() # 2
# Get the blocks as an RDD of sub-matrix blocks.
blocksRDD = mat.blocks
# Convert to a LocalMatrix.
localMat = mat.toLocalMatrix()
# Convert to an IndexedRowMatrix.
indexedRowMat = mat.toIndexedRowMatrix()
# Convert to a CoordinateMatrix.
coordinateMat = mat.toCoordinateMatrix()