Structured Data Processing with Spark¶

Feng Li¶

Guanghua School of Management¶

Peking University¶

feng.li@gsm.pku.edu.cn¶

Course home page: https://feng.li/bdcf¶

Spark SQL¶

  • Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed.

  • Spark SQL uses this extra information to perform extra optimizations.

  • One use of Spark SQL is to execute SQL queries.

  • Spark SQL can also be used to read data from an existing Hive installation.

Spark DataFrame¶

  • A DataFrame is a Dataset organized into named columns.

  • It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.

  • DataFrames can be constructed from a wide array of sources such as:

    • structured data files,
    • tables in Hive,
    • external databases, or
    • existing RDDs.
  • The DataFrame API is available in Scala, Java, Python, and R.

Start a Spark session¶

In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .config("spark.executor.memory", "2g")\
        .config("spark.cores.max", "2")\
        .appName("Spark DataFrames").getOrCreate() # using spark server
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/23 20:39:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
In [2]:
spark # test if Spark session is created or not
Out[2]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.5.4
Master
local[*]
AppName
Spark DataFrames

Creating DataFrames¶

Upload Data to HPC server¶

  • Using SCOW web interface (small dataset)

    https://scow-jx2.pku.edu.cn/files/jx2/~/

  • Using FileZilla

    • Make sure you have OTP enabled with your account

      https://hpc.pku.edu.cn/ug/guide/access/

    • FileZilla Menu -> File -> Site Manager

      Host: wmjx2-login.pku.edu.cn

      Logon Type: Interactive

      User: youruniversityid

  • Using command line tool rsync

Create Spark DataFrame directly from a file¶

In [12]:
sdf = spark.read.csv("../data/m5-forecasting-accuracy/calendar.csv", header=True) # read files
# sdf = spark.read.csv("file:///nfs-share/home/2406184221/bdcf-slides/data/m5-forecasting-accuracy/calendar.csv") # or use the full path
sdf.show() # Displays the content of the DataFrame to stdout
+----------+--------+---------+----+-----+----+----+-------------+------------+------------+------------+-------+-------+-------+
|      date|wm_yr_wk|  weekday|wday|month|year|   d| event_name_1|event_type_1|event_name_2|event_type_2|snap_CA|snap_TX|snap_WI|
+----------+--------+---------+----+-----+----+----+-------------+------------+------------+------------+-------+-------+-------+
|2011-01-29|   11101| Saturday|   1|    1|2011| d_1|         NULL|        NULL|        NULL|        NULL|      0|      0|      0|
|2011-01-30|   11101|   Sunday|   2|    1|2011| d_2|         NULL|        NULL|        NULL|        NULL|      0|      0|      0|
|2011-01-31|   11101|   Monday|   3|    1|2011| d_3|         NULL|        NULL|        NULL|        NULL|      0|      0|      0|
|2011-02-01|   11101|  Tuesday|   4|    2|2011| d_4|         NULL|        NULL|        NULL|        NULL|      1|      1|      0|
|2011-02-02|   11101|Wednesday|   5|    2|2011| d_5|         NULL|        NULL|        NULL|        NULL|      1|      0|      1|
|2011-02-03|   11101| Thursday|   6|    2|2011| d_6|         NULL|        NULL|        NULL|        NULL|      1|      1|      1|
|2011-02-04|   11101|   Friday|   7|    2|2011| d_7|         NULL|        NULL|        NULL|        NULL|      1|      0|      0|
|2011-02-05|   11102| Saturday|   1|    2|2011| d_8|         NULL|        NULL|        NULL|        NULL|      1|      1|      1|
|2011-02-06|   11102|   Sunday|   2|    2|2011| d_9|    SuperBowl|    Sporting|        NULL|        NULL|      1|      1|      1|
|2011-02-07|   11102|   Monday|   3|    2|2011|d_10|         NULL|        NULL|        NULL|        NULL|      1|      1|      0|
|2011-02-08|   11102|  Tuesday|   4|    2|2011|d_11|         NULL|        NULL|        NULL|        NULL|      1|      0|      1|
|2011-02-09|   11102|Wednesday|   5|    2|2011|d_12|         NULL|        NULL|        NULL|        NULL|      1|      1|      1|
|2011-02-10|   11102| Thursday|   6|    2|2011|d_13|         NULL|        NULL|        NULL|        NULL|      1|      0|      0|
|2011-02-11|   11102|   Friday|   7|    2|2011|d_14|         NULL|        NULL|        NULL|        NULL|      0|      1|      1|
|2011-02-12|   11103| Saturday|   1|    2|2011|d_15|         NULL|        NULL|        NULL|        NULL|      0|      1|      1|
|2011-02-13|   11103|   Sunday|   2|    2|2011|d_16|         NULL|        NULL|        NULL|        NULL|      0|      1|      0|
|2011-02-14|   11103|   Monday|   3|    2|2011|d_17|ValentinesDay|    Cultural|        NULL|        NULL|      0|      0|      1|
|2011-02-15|   11103|  Tuesday|   4|    2|2011|d_18|         NULL|        NULL|        NULL|        NULL|      0|      1|      1|
|2011-02-16|   11103|Wednesday|   5|    2|2011|d_19|         NULL|        NULL|        NULL|        NULL|      0|      0|      0|
|2011-02-17|   11103| Thursday|   6|    2|2011|d_20|         NULL|        NULL|        NULL|        NULL|      0|      0|      0|
+----------+--------+---------+----+-----+----+----+-------------+------------+------------+------------+-------+-------+-------+
only showing top 20 rows

Export DataFrame to a local disk¶

In [13]:
sdf.write.mode('overwrite').csv("myspark/")# Save Spark DataFrame to a folder on the local disk.
In [14]:
import os  
os.listdir("myspark") # Let's check if everything is there on the local disk
Out[14]:
['._SUCCESS.crc',
 '_SUCCESS',
 'part-00000-a7fa5a2a-9dd5-4c48-b472-2f815b8d027d-c000.csv',
 '.part-00000-a7fa5a2a-9dd5-4c48-b472-2f815b8d027d-c000.csv.crc']