Structured Data Processing with Spark¶

Feng Li¶

Central University of Finance and Economics¶

feng.li@cufe.edu.cn¶

Course home page: https://feng.li/distcomp¶

Spark SQL¶

  • Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed.

  • Spark SQL uses this extra information to perform extra optimizations.

  • One use of Spark SQL is to execute SQL queries.

  • Spark SQL can also be used to read data from an existing Hive installation.

Datasets and DataFrames¶

Spark Datasets¶

  • A Dataset is a distributed collection of data.

  • Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.).

  • The Dataset API is available in Scala and Java.

  • Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName).

Spark DataFrame¶

  • A DataFrame is a Dataset organized into named columns.

  • It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.

  • DataFrames can be constructed from a wide array of sources such as:

    • structured data files,
    • tables in Hive,
    • external databases, or
    • existing RDDs.
  • The DataFrame API is available in Scala, Java, Python, and R.

Start a Spark session¶

In [6]:
import findspark ## Only needed when you run spark witin Jupyter notebook
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .config("spark.executor.memory", "2g")\
        .config("spark.cores.max", "2")\
        .master("spark://master:7077")\
        .appName("Python Spark").getOrCreate() # using spark server
In [8]:
spark # test if Spark session is created or not
Out[8]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v2.4.4
Master
local[*]
AppName
Python Spark

Creating DataFrames¶

Convert an RDD to a DataFrame¶

In [9]:
sc = spark.sparkContext # make a spakr context for RDD
In [10]:
sc
Out[10]:

SparkContext

Spark UI

Version
v2.4.4
Master
local[*]
AppName
Python Spark
In [20]:
# Load a text file and convert each line to a Row.
from pyspark.sql import Row
lines = sc.textFile("data/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
 
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople
Out[20]:
DataFrame[age: bigint, name: string]
In [24]:
# SQL can be run over DataFrames that have been registered as a table.
schemaPeople.createOrReplaceTempView("people")
teenagers
Out[24]:
DataFrame[name: string]
In [29]:
teenagers.toPandas() # We could export the Spark DataFrame to a usual Pandas DataFrame
Out[29]:
name
0 Justin

Create Spark DataFrame directly from a file¶

In [39]:
sdf = spark.read.csv("data/people.txt") # read hdfs file 
sdf.show() # Displays the content of the DataFrame to stdout
+-------+---+
|    _c0|_c1|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

In [ ]:
# If your data are available locally, you could explicitly specify with "file://"
# But for this to work, the copy of the file needs to be on every worker or 
# every worker need to have access to common shared drive as in a NFS mount.

sdf = spark.read.csv("file:///home/fli/data/people.txt") # read hdfs file 
sdf.show() # Displays the content of the DataFrame to stdout
In [1]:
sdf2 = spark.read.json("data/people.json")
# Displays the content of the DataFrame to stdout
sdf2.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

  • The CSV file dose not have a header of the data, but we could create a description (schema in Spark) for it
In [35]:
# Import data types
from pyspark.sql.types import *
# The schema is encoded in a string.

# Create a schema
schemaString = ["name", "age"]
fields = [StructField(field_name, StringType(), True) for field_name in schemaString]
schema = StructType(fields)
In [36]:
schema
Out[36]:
StructType(List(StructField(name,StringType,true),StructField(age,StringType,true)))
In [38]:
sdf_withschema = spark.createDataFrame(people, schema)
sdf_withschema.show()
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

Convert DataFrame to RDD¶

In [43]:
rdd1 = sdf_withschema.rdd
rdd1
Out[43]:
MapPartitionsRDD[132] at javaToPython at NativeMethodAccessorImpl.java:0

Export DataFrame to a local disk¶

In [53]:
sdf.write.mode('overwrite').csv("myspark/")# Save Spark DataFrame to a folder on the local disk.
In [49]:
import os 
os.listdir("myspark") # Let's check if everything is there on the local disk
Out[49]:
['_SUCCESS',
 '.part-00000-23e89afa-747c-46dd-9a9b-f2ee8d79b94b-c000.csv.crc',
 '._SUCCESS.crc',
 'part-00000-23e89afa-747c-46dd-9a9b-f2ee8d79b94b-c000.csv']