At a high level, every Spark application consists of
The main abstraction Spark provides is a resilient distributed dataset (RDD),
A second abstraction in Spark is shared variables that can be used in parallel operations.
By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task.
Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program.
Spark supports two types of shared variables:
Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel.
There are two ways to create RDDs:
SparkContext
object, which tells Spark how to access a cluster. To create a SparkContext
you first need to build a SparkConf
object that contains information about your application.import findspark
findspark.init()
import pyspark
conf = pyspark.SparkConf().setAppName("My First Spark RDD APP") #.setMaster("local") # “yarn”
sc = pyspark.SparkContext(conf=conf)
SparkContext
. You have to stop existing one first withsc.stop()
sc = pyspark.SparkContext.getOrCreate()
Created by calling SparkContext’s parallelize method on an existing iterable or collection in your driver program.
The elements of the collection are copied to form a distributed dataset that can be operated on in parallel.
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
distData
data2 = [11, 21, 31, 41, 51]
distData2 = sc.parallelize(data2)
distData2
ParallelCollectionRDD[12] at parallelize at PythonRDD.scala:195
PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3a://, etc URI) and reads it as a collection of lines.
licenseFile = sc.textFile("/package/spark-2.4.4-bin-hadoop2.7/licenses/LICENSE-vis.txt")
licenseFile
/opt/apps/ecm/service/spark/2.4.4/package/spark-2.4.4-bin-hadoop2.7/licenses/LICENSE-vis.txt MapPartitionsRDD[14] at textFile at NativeMethodAccessorImpl.java:0
If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.
All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory")
, textFile("/my/directory/*.txt")
, and textFile("/my/directory/*.gz")
.
The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.
SparkContext.wholeTextFiles
lets you read a directory containing multiple small text files, and returns each of them as (filename, content)
pairs. This is in contrast with textFile, which would return one record per line in each file.
RDD.saveAsPickleFile
and SparkContext.pickleFile
support saving an RDD in a simple format consisting of pickled Python objects. Batching is used on pickle serialization, with default batch size 10.
lineLengths = licenseFile.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
lineLengths
PythonRDD[4] at RDD at PythonRDD.scala:53
totalLength
385
One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations.
When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it).
This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.
You can mark an RDD to be persisted using the persist()
or cache()
methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.
lineLengths.persist()
PythonRDD[4] at RDD at PythonRDD.scala:53
lineLengths.cache() # same as persist() but use default storage level
PythonRDD[4] at RDD at PythonRDD.scala:53
MEMORY_ONLY
Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISK
Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SER
(Java and Scala) Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER
(Java and Scala) Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLY
Store the RDD partitions only on disk.
MEMORY_ONLY_2
, MEMORY_AND_DISK_2
, etc. Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP
(experimental) Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.
Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion.
If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.
lineLengths.unpersist()
PythonRDD[4] at RDD at PythonRDD.scala:53
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
broadcastVar = sc.broadcast([1, 2, 3])
broadcastVar
<pyspark.broadcast.Broadcast at 0x7f5067b011d0>
Broadcast variables are created from a variable v
by calling SparkContext.broadcast(v)
.
The broadcast variable is a wrapper around v
, and its value can be accessed by calling the .value
method.
After the broadcast variable is created, it should be used instead of the value v
in any functions run on the cluster so that v
is not shipped to the nodes more than once.
In addition, the object v
should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).
broadcastVar.value
[1, 2, 3]
Accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel.
They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types.
accum = sc.accumulator(0)
sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
accum.value
10
“LAZY” the word itself indicates its meaning ‘not at the same time’. That means, it evaluates something only when we require it.
In accordance with a spark, it does not execute each operation right away, that means it does not start until we trigger any action.
Once an action is called all the transformations will execute in one go.
It figured out that all transformations can be combined together into a single transformation and executed together.
Benifits
import numpy as np
import scipy.sparse as sps
# Use a NumPy array as a dense vector.
dv1 = np.array([1.0, 0.0, 3.0])
# Use a Python list as a dense vector.
dv2 = [1.0, 0.0, 3.0]
# Create a SparseVector.
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
# Use a single-column SciPy csc_matrix as a sparse vector.
sv2 = sps.csc_matrix((np.array([1.0, 3.0]),
np.array([0, 2]),
np.array([0, 2])), shape=(3, 1))
from pyspark.mllib.linalg import Matrix, Matrices
# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
dm2 = Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])
print(dm2)
# Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])
print(sm)
DenseMatrix([[1., 4.], [2., 5.], [3., 6.]]) 3 X 2 CSCMatrix (0,0) 9.0 (2,1) 6.0 (1,1) 8.0
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint
# Create a labeled point with a positive label and a dense feature vector.
pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])
# Create a labeled point with a negative label and a sparse feature vector.
neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))
from pyspark.mllib.util import MLUtils
examples = MLUtils.loadLibSVMFile(sc,
"/opt/apps/ecm/service/spark/2.4.4/package/spark-2.4.4-bin-hadoop2.7/data/mllib/sample_libsvm_data.txt")
print(examples)
from pyspark.mllib.linalg.distributed import RowMatrix
# Create an RDD of vectors.
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])
# Create a RowMatrix from an RDD of vectors.
mat = RowMatrix(rows)
# Get its size.
m = mat.numRows() # 4
n = mat.numCols() # 3
# Get the rows as an RDD of vectors again.
rowsRDD = mat.rows
print(m,n,rowsRDD)
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
# Create an RDD of indexed rows.
# - This can be done explicitly with the IndexedRow class:
indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
IndexedRow(1, [4, 5, 6]),
IndexedRow(2, [7, 8, 9]),
IndexedRow(3, [10, 11, 12])])
# - or by using (long, vector) tuples:
indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),
(2, [7, 8, 9]), (3, [10, 11, 12])])
A BlockMatrix is a distributed matrix backed by an RDD of MatrixBlocks, where a MatrixBlock is a tuple of ((Int, Int), Matrix), where the (Int, Int) is the index of the block, and Matrix is the sub-matrix at the given index with size rowsPerBlock x colsPerBlock. BlockMatrix supports methods such as add and multiply with another BlockMatrix. BlockMatrix also has a helper function validate which can be used to check whether the BlockMatrix is set up properly.
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix
# Create an RDD of sub-matrix blocks.
blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
# Create a BlockMatrix from an RDD of sub-matrix blocks.
mat = BlockMatrix(blocks, 3, 2)
# Get its size.
m = mat.numRows() # 6
n = mat.numCols() # 2
# Get the blocks as an RDD of sub-matrix blocks.
blocksRDD = mat.blocks
# Convert to a LocalMatrix.
localMat = mat.toLocalMatrix()
# Convert to an IndexedRowMatrix.
indexedRowMat = mat.toIndexedRowMatrix()
# Conver t to a Coordinate Matrix.
coordinateMat = mat.toCoordinateMatrix()
Run a simple Spark application。
Understand the basic data structure with RDD interface。
External Read