Structured Data Processing with Spark¶
Feng Li¶
Guanghua School of Management¶
Peking University¶
feng.li@gsm.pku.edu.cn¶
Course home page: https://feng.li/bdcf¶
Spark SQL¶
Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed.
Spark SQL uses this extra information to perform extra optimizations.
One use of Spark SQL is to execute SQL queries.
Spark SQL can also be used to read data from an existing Hive installation.
Spark DataFrame¶
A DataFrame is a Dataset organized into named columns.
It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.
DataFrames can be constructed from a wide array of sources such as:
- structured data files,
- tables in Hive,
- external databases, or
- existing RDDs.
The DataFrame API is available in Scala, Java, Python, and R.
Start a Spark session¶
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.config("spark.executor.memory", "2g")\
.config("spark.cores.max", "2")\
.appName("Spark DataFrames").getOrCreate() # using spark server
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/02/23 20:39:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
spark # test if Spark session is created or not
SparkSession - in-memory
Creating DataFrames¶
Upload Data to HPC server¶
Using SCOW web interface (small dataset)
Using FileZilla
Make sure you have OTP enabled with your account
FileZilla Menu -> File -> Site Manager
Host:
wmjx2-login.pku.edu.cn
Logon Type: Interactive
User: youruniversityid
Using command line tool
rsync
Create Spark DataFrame directly from a file¶
sdf = spark.read.csv("../data/m5-forecasting-accuracy/calendar.csv", header=True) # read files
# sdf = spark.read.csv("file:///nfs-share/home/2406184221/bdcf-slides/data/m5-forecasting-accuracy/calendar.csv") # or use the full path
sdf.show() # Displays the content of the DataFrame to stdout
+----------+--------+---------+----+-----+----+----+-------------+------------+------------+------------+-------+-------+-------+ | date|wm_yr_wk| weekday|wday|month|year| d| event_name_1|event_type_1|event_name_2|event_type_2|snap_CA|snap_TX|snap_WI| +----------+--------+---------+----+-----+----+----+-------------+------------+------------+------------+-------+-------+-------+ |2011-01-29| 11101| Saturday| 1| 1|2011| d_1| NULL| NULL| NULL| NULL| 0| 0| 0| |2011-01-30| 11101| Sunday| 2| 1|2011| d_2| NULL| NULL| NULL| NULL| 0| 0| 0| |2011-01-31| 11101| Monday| 3| 1|2011| d_3| NULL| NULL| NULL| NULL| 0| 0| 0| |2011-02-01| 11101| Tuesday| 4| 2|2011| d_4| NULL| NULL| NULL| NULL| 1| 1| 0| |2011-02-02| 11101|Wednesday| 5| 2|2011| d_5| NULL| NULL| NULL| NULL| 1| 0| 1| |2011-02-03| 11101| Thursday| 6| 2|2011| d_6| NULL| NULL| NULL| NULL| 1| 1| 1| |2011-02-04| 11101| Friday| 7| 2|2011| d_7| NULL| NULL| NULL| NULL| 1| 0| 0| |2011-02-05| 11102| Saturday| 1| 2|2011| d_8| NULL| NULL| NULL| NULL| 1| 1| 1| |2011-02-06| 11102| Sunday| 2| 2|2011| d_9| SuperBowl| Sporting| NULL| NULL| 1| 1| 1| |2011-02-07| 11102| Monday| 3| 2|2011|d_10| NULL| NULL| NULL| NULL| 1| 1| 0| |2011-02-08| 11102| Tuesday| 4| 2|2011|d_11| NULL| NULL| NULL| NULL| 1| 0| 1| |2011-02-09| 11102|Wednesday| 5| 2|2011|d_12| NULL| NULL| NULL| NULL| 1| 1| 1| |2011-02-10| 11102| Thursday| 6| 2|2011|d_13| NULL| NULL| NULL| NULL| 1| 0| 0| |2011-02-11| 11102| Friday| 7| 2|2011|d_14| NULL| NULL| NULL| NULL| 0| 1| 1| |2011-02-12| 11103| Saturday| 1| 2|2011|d_15| NULL| NULL| NULL| NULL| 0| 1| 1| |2011-02-13| 11103| Sunday| 2| 2|2011|d_16| NULL| NULL| NULL| NULL| 0| 1| 0| |2011-02-14| 11103| Monday| 3| 2|2011|d_17|ValentinesDay| Cultural| NULL| NULL| 0| 0| 1| |2011-02-15| 11103| Tuesday| 4| 2|2011|d_18| NULL| NULL| NULL| NULL| 0| 1| 1| |2011-02-16| 11103|Wednesday| 5| 2|2011|d_19| NULL| NULL| NULL| NULL| 0| 0| 0| |2011-02-17| 11103| Thursday| 6| 2|2011|d_20| NULL| NULL| NULL| NULL| 0| 0| 0| +----------+--------+---------+----+-----+----+----+-------------+------------+------------+------------+-------+-------+-------+ only showing top 20 rows
Export DataFrame to a local disk¶
sdf.write.mode('overwrite').csv("myspark/")# Save Spark DataFrame to a folder on the local disk.
import os
os.listdir("myspark") # Let's check if everything is there on the local disk
['._SUCCESS.crc', '_SUCCESS', 'part-00000-a7fa5a2a-9dd5-4c48-b472-2f815b8d027d-c000.csv', '.part-00000-a7fa5a2a-9dd5-4c48-b472-2f815b8d027d-c000.csv.crc']