Spark基础¶

李丰

feng.li@cufe.edu.cn

中央财经大学

Spark的优势¶

  • Apache Spark是一个开源集群运算框架,最初是由加州大学柏克莱分校AMPLab所开发。
  • Spark使用了存储器内运算技术,能在数据尚未写入硬盘时即在存储器内分析运算。
  • Spark在存储器内运行程序的运算速度能做到比Hadoop MapReduce的运算速度快上100倍。
  • 即便是运行程序于硬盘时,Spark也能快上10倍速度。
  • Spark允许用户将数据加载至集群存储器,并多次对其进行查询,非常适合用于机器学习算法。
  • 支持 Java, Scala, Python 和 R.

Spark 组件¶

  • DataFrame 分布式数据框。
  • Spark Streaming 利用Spark快速调度能力截取小批量的数据并对之运行RDD转换运行流分析。
  • MLlib 使用许多常见的机器学习和统计算法(分类与回归,降维、特征提取、优化算法)简化大规模机器学习时间。
  • GraphX Spark上的分布式图形处理框架。

Spark RDD¶

  • RDD是Spark的核心数据抽象方式,全称为Resillient Distributed Dataset,即弹性分布式数据集。
  • RDD通常通过Hadoop上的文件,即HDFS文件或者Hive表,来进行创建;有时也可以通过应用程序中的集合来创建。
  • RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘(弹性)。

Spark 架构¶

Spark

Spark 运行模式¶

  • Spark 可以作为一个独立的分布式系统运行。
  • Spark 也可以运行在Hadoop等分布式系统中。

Spark交互模式¶

  • Spark-shell 基于Scala语言的交互界面

    $ spark-shell

  • PySpark Python版本的Spark交互

    $ pyspark

  • SparkR R版本的Spark交互模式

    $ sparkR

Spark Batch模式¶

$ spark-submit <SparkApp.py>

通过 Jupyter (Python Kernel) 运行PySpark¶

  • 首先安装Python findspark 模块,使得pyspark可以作为Python的一个标准模块调用。
  • Spark的安装路径可以通过环境变量$SPARK_HOME得到。
In [1]:
import findspark
findspark.init("/usr/lib/spark-current")
import pyspark

创建一个Spark应用¶

In [15]:
sc.stop()
sc = pyspark.SparkContext("local", "My First Spark App")
  • 注意:不能同时创建多个SparkContext
  • 要么结束上一个SparkContext再创建新的
In [ ]:
sc.stop()
  • 或者使用一个可容错的函数代替
In [16]:
sc = pyspark.SparkContext.getOrCreate()
  • Spark提供了非常简洁的与RDD交互的方法
In [17]:
textFile = sc.textFile("license.txt")
textFile.first()
Out[17]:
'Copyright (c) 2000-2005 INRIA, France Telecom'
In [18]:
textFile.count()
Out[18]:
29
  • Spark的MapReduce就像Python的lambda函数一样简单
In [19]:
textFile.map(lambda line: len(line.split()))\
        .reduce(lambda a, b: a if (a > b) else b)
Out[19]:
15
  • Spark允许将数据加载到分布式系统的共享缓存中以加快数据访问
In [20]:
textFile.cache()
Out[20]:
license.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0
In [21]:
textFile.count()
Out[21]:
29

Spark数据类型¶

  • 存储于单机的本地向量(Local vector)
In [22]:
import numpy as np
import scipy.sparse as sps
from pyspark.mllib.linalg import Vectors

# Use a NumPy array as a dense vector.
dv1 = np.array([1.0, 0.0, 3.0])
# Use a Python list as a dense vector.
dv2 = [1.0, 0.0, 3.0]
# Create a SparseVector.
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
# Use a single-column SciPy csc_matrix as a sparse vector.
sv2 = sps.csc_matrix((np.array([1.0, 3.0]),
                      np.array([0, 2]),
                      np.array([0, 2])), shape=(3, 1))
  • 存储于单机的本地矩阵
In [9]:
from pyspark.mllib.linalg import Matrix, Matrices
# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
dm2 = Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])
print(dm2)
# Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])
print(sm)
DenseMatrix([[1., 4.],
             [2., 5.],
             [3., 6.]])
3 X 2 CSCMatrix
(0,0) 9.0
(2,1) 6.0
(1,1) 8.0
  • 标签(Labeled Points)主要用于有监督的机器学习
In [10]:
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint

# Create a labeled point with a positive label and a dense feature vector.
pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])

# Create a labeled point with a negative label and a sparse feature vector.
neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))
  • 稀疏数据
In [11]:
from pyspark.mllib.util import MLUtils
examples = MLUtils.loadLibSVMFile(sc,
           "libsvm_data.txt")
print(examples)
PythonRDD[9] at RDD at PythonRDD.scala:49

分布式矩阵 (Distributed matrix)¶

行矩阵(RowMatrix)¶

In [12]:
from pyspark.mllib.linalg.distributed import RowMatrix
# Create an RDD of vectors.
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])
# Create a RowMatrix from an RDD of vectors.
mat = RowMatrix(rows)
# Get its size.
m = mat.numRows()  # 4
n = mat.numCols()  # 3
# Get the rows as an RDD of vectors again.
rowsRDD = mat.rows
print(m,n,rowsRDD)
4 3 MapPartitionsRDD[13] at mapPartitions at PythonMLLibAPI.scala:1339

有行号的行矩阵(IndexedRowMatrix)¶

In [13]:
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
# Create an RDD of indexed rows.
#   - This can be done explicitly with the IndexedRow class:
indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
                              IndexedRow(1, [4, 5, 6]),
                              IndexedRow(2, [7, 8, 9]),
                              IndexedRow(3, [10, 11, 12])])
#   - or by using (long, vector) tuples:
indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),
                              (2, [7, 8, 9]), (3, [10, 11, 12])])

上机实践¶

  • 创建并运行一个简单的Spark应用。

  • 了解Spark的基础数据类型。

  • 课外阅读材料

    • https://spark.apache.org/docs/latest/api/python/index.html
    • https://spark.apache.org/examples.html