Spark-shell 基于Scala语言的交互界面
$ spark-shell
PySpark Python版本的Spark交互
$ pyspark
SparkR R版本的Spark交互模式
$ sparkR
$ spark-submit <SparkApp.py>
findspark
模块,使得pyspark
可以作为Python的一个标准模块调用。$SPARK_HOME
得到。import findspark
findspark.init("/usr/lib/spark-current")
import pyspark
sc.stop()
sc = pyspark.SparkContext("local", "My First Spark App")
sc.stop()
sc = pyspark.SparkContext.getOrCreate()
textFile = sc.textFile("license.txt")
textFile.first()
'Copyright (c) 2000-2005 INRIA, France Telecom'
textFile.count()
29
textFile.map(lambda line: len(line.split()))\
.reduce(lambda a, b: a if (a > b) else b)
15
textFile.cache()
license.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0
textFile.count()
29
import numpy as np
import scipy.sparse as sps
from pyspark.mllib.linalg import Vectors
# Use a NumPy array as a dense vector.
dv1 = np.array([1.0, 0.0, 3.0])
# Use a Python list as a dense vector.
dv2 = [1.0, 0.0, 3.0]
# Create a SparseVector.
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
# Use a single-column SciPy csc_matrix as a sparse vector.
sv2 = sps.csc_matrix((np.array([1.0, 3.0]),
np.array([0, 2]),
np.array([0, 2])), shape=(3, 1))
from pyspark.mllib.linalg import Matrix, Matrices
# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
dm2 = Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])
print(dm2)
# Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])
print(sm)
DenseMatrix([[1., 4.], [2., 5.], [3., 6.]]) 3 X 2 CSCMatrix (0,0) 9.0 (2,1) 6.0 (1,1) 8.0
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint
# Create a labeled point with a positive label and a dense feature vector.
pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])
# Create a labeled point with a negative label and a sparse feature vector.
neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))
from pyspark.mllib.util import MLUtils
examples = MLUtils.loadLibSVMFile(sc,
"libsvm_data.txt")
print(examples)
PythonRDD[9] at RDD at PythonRDD.scala:49
from pyspark.mllib.linalg.distributed import RowMatrix
# Create an RDD of vectors.
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])
# Create a RowMatrix from an RDD of vectors.
mat = RowMatrix(rows)
# Get its size.
m = mat.numRows() # 4
n = mat.numCols() # 3
# Get the rows as an RDD of vectors again.
rowsRDD = mat.rows
print(m,n,rowsRDD)
4 3 MapPartitionsRDD[13] at mapPartitions at PythonMLLibAPI.scala:1339
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
# Create an RDD of indexed rows.
# - This can be done explicitly with the IndexedRow class:
indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
IndexedRow(1, [4, 5, 6]),
IndexedRow(2, [7, 8, 9]),
IndexedRow(3, [10, 11, 12])])
# - or by using (long, vector) tuples:
indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),
(2, [7, 8, 9]), (3, [10, 11, 12])])