Working with Spark DataFrame¶

Feng Li¶

Guanghua School of Management¶

Peking University¶

feng.li@gsm.pku.edu.cn¶

Course home page: https://feng.li/bdcf¶

Start a Spark Session¶

In [6]:
import os, sys # Ensure All environment variables are properly set 
# os.environ["JAVA_HOME"] = os.path.dirname(sys.executable)
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

from pyspark.sql import SparkSession # build Spark Session
spark = SparkSession.builder\
        .config("spark.ui.enabled", "false")  \
        .config("spark.executor.memory", "2g")\
        .config("spark.cores.max", "2")\
        .appName("Spark DataFrame").getOrCreate() # using spark server

Note:

  • If you have trouble starting a PySpark interactive session due to a system limitation, make sure you have disabled Spark UI .config("spark.ui.enabled", "false")

  • You could also submit your Spark Job via the spark-submit command in PKU HPC server (more details later).

    export JAVA_HOME=/nfs-share/software/anaconda/2020.02/envs/python3.12/bin
    export PYSPARK_PYTHON=/nfs-share/software/anaconda/2020.02/envs/python3.12/bin/python
    export PYSPARK_DRIVER_PYTHON=$PYSPARK_PYTHON
    /nfs-share/software/anaconda/2020.02/envs/python3.12/bin/spark-submit \
        --conf spark.ui.enabled=false     \
        your-pyspark-code.py         
    
In [7]:
sdf = spark.read.csv("../data/m5-forecasting-accuracy/calendar.csv", header=True, inferSchema=True) # read files
sdf.show() # Displays the content of the DataFrame to stdout
+----------+--------+---------+----+-----+----+----+-------------+------------+------------+------------+-------+-------+-------+
|      date|wm_yr_wk|  weekday|wday|month|year|   d| event_name_1|event_type_1|event_name_2|event_type_2|snap_CA|snap_TX|snap_WI|
+----------+--------+---------+----+-----+----+----+-------------+------------+------------+------------+-------+-------+-------+
|2011-01-29|   11101| Saturday|   1|    1|2011| d_1|         NULL|        NULL|        NULL|        NULL|      0|      0|      0|
|2011-01-30|   11101|   Sunday|   2|    1|2011| d_2|         NULL|        NULL|        NULL|        NULL|      0|      0|      0|
|2011-01-31|   11101|   Monday|   3|    1|2011| d_3|         NULL|        NULL|        NULL|        NULL|      0|      0|      0|
|2011-02-01|   11101|  Tuesday|   4|    2|2011| d_4|         NULL|        NULL|        NULL|        NULL|      1|      1|      0|
|2011-02-02|   11101|Wednesday|   5|    2|2011| d_5|         NULL|        NULL|        NULL|        NULL|      1|      0|      1|
|2011-02-03|   11101| Thursday|   6|    2|2011| d_6|         NULL|        NULL|        NULL|        NULL|      1|      1|      1|
|2011-02-04|   11101|   Friday|   7|    2|2011| d_7|         NULL|        NULL|        NULL|        NULL|      1|      0|      0|
|2011-02-05|   11102| Saturday|   1|    2|2011| d_8|         NULL|        NULL|        NULL|        NULL|      1|      1|      1|
|2011-02-06|   11102|   Sunday|   2|    2|2011| d_9|    SuperBowl|    Sporting|        NULL|        NULL|      1|      1|      1|
|2011-02-07|   11102|   Monday|   3|    2|2011|d_10|         NULL|        NULL|        NULL|        NULL|      1|      1|      0|
|2011-02-08|   11102|  Tuesday|   4|    2|2011|d_11|         NULL|        NULL|        NULL|        NULL|      1|      0|      1|
|2011-02-09|   11102|Wednesday|   5|    2|2011|d_12|         NULL|        NULL|        NULL|        NULL|      1|      1|      1|
|2011-02-10|   11102| Thursday|   6|    2|2011|d_13|         NULL|        NULL|        NULL|        NULL|      1|      0|      0|
|2011-02-11|   11102|   Friday|   7|    2|2011|d_14|         NULL|        NULL|        NULL|        NULL|      0|      1|      1|
|2011-02-12|   11103| Saturday|   1|    2|2011|d_15|         NULL|        NULL|        NULL|        NULL|      0|      1|      1|
|2011-02-13|   11103|   Sunday|   2|    2|2011|d_16|         NULL|        NULL|        NULL|        NULL|      0|      1|      0|
|2011-02-14|   11103|   Monday|   3|    2|2011|d_17|ValentinesDay|    Cultural|        NULL|        NULL|      0|      0|      1|
|2011-02-15|   11103|  Tuesday|   4|    2|2011|d_18|         NULL|        NULL|        NULL|        NULL|      0|      1|      1|
|2011-02-16|   11103|Wednesday|   5|    2|2011|d_19|         NULL|        NULL|        NULL|        NULL|      0|      0|      0|
|2011-02-17|   11103| Thursday|   6|    2|2011|d_20|         NULL|        NULL|        NULL|        NULL|      0|      0|      0|
+----------+--------+---------+----+-----+----+----+-------------+------------+------------+------------+-------+-------+-------+
only showing top 20 rows

Descriptive Statistics¶

In [8]:
sdf.describe().show() # ugly description
+-------+----------+------------------+---------+------------------+-----------------+------------------+-----+------------+------------+--------------+------------+-------------------+-------------------+-------------------+
|summary|      date|          wm_yr_wk|  weekday|              wday|            month|              year|    d|event_name_1|event_type_1|  event_name_2|event_type_2|            snap_CA|            snap_TX|            snap_WI|
+-------+----------+------------------+---------+------------------+-----------------+------------------+-----+------------+------------+--------------+------------+-------------------+-------------------+-------------------+
|  count|      1969|              1969|     1969|              1969|             1969|              1969| 1969|         162|         162|             5|           5|               1969|               1969|               1969|
|   mean|      NULL|11347.086338242763|     NULL|3.9974606399187405|6.325545962417471| 2013.288471305231| NULL|        NULL|        NULL|          NULL|        NULL|0.33011681056373793|0.33011681056373793|0.33011681056373793|
| stddev|      NULL| 155.2770428028507|     NULL|2.0011413541040746|3.416864338775945|1.5801982706329631| NULL|        NULL|        NULL|          NULL|        NULL|0.47037439309734164|0.47037439309734164|0.47037439309734164|
|    min|2011-01-29|             11101|   Friday|                 1|                1|              2011|  d_1|Chanukah End|    Cultural| Cinco De Mayo|    Cultural|                  0|                  0|                  0|
|    max|2016-06-19|             11621|Wednesday|                 7|                9|              2016|d_999| VeteransDay|    Sporting|OrthodoxEaster|   Religious|                  1|                  1|                  1|
+-------+----------+------------------+---------+------------------+-----------------+------------------+-----+------------+------------+--------------+------------+-------------------+-------------------+-------------------+

In [9]:
sdf.describe().toPandas() # pretty
Out[9]:
summary date wm_yr_wk weekday wday month year d event_name_1 event_type_1 event_name_2 event_type_2 snap_CA snap_TX snap_WI
0 count 1969 1969 1969 1969 1969 1969 1969 162 162 5 5 1969 1969 1969
1 mean None 11347.086338242763 None 3.9974606399187405 6.325545962417471 2013.288471305231 None None None None None 0.33011681056373793 0.33011681056373793 0.33011681056373793
2 stddev None 155.2770428028507 None 2.0011413541040746 3.416864338775945 1.5801982706329631 None None None None None 0.47037439309734164 0.47037439309734164 0.47037439309734164
3 min 2011-01-29 11101 Friday 1 1 2011 d_1 Chanukah End Cultural Cinco De Mayo Cultural 0 0 0
4 max 2016-06-19 11621 Wednesday 7 9 2016 d_999 VeteransDay Sporting OrthodoxEaster Religious 1 1 1
In [6]:
sdf.describe(['year']).show()
+-------+------------------+
|summary|              year|
+-------+------------------+
|  count|              1969|
|   mean| 2013.288471305231|
| stddev|1.5801982706329631|
|    min|              2011|
|    max|              2016|
+-------+------------------+

Print the schema in a tree format¶

In [10]:
sdf.printSchema()
root
 |-- date: string (nullable = true)
 |-- wm_yr_wk: string (nullable = true)
 |-- weekday: string (nullable = true)
 |-- wday: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)
 |-- d: string (nullable = true)
 |-- event_name_1: string (nullable = true)
 |-- event_type_1: string (nullable = true)
 |-- event_name_2: string (nullable = true)
 |-- event_type_2: string (nullable = true)
 |-- snap_CA: string (nullable = true)
 |-- snap_TX: string (nullable = true)
 |-- snap_WI: string (nullable = true)

Select columns¶

In [12]:
sdf.select(["date","year","snap_CA", "snap_TX", "snap_WI"]).show()
+----------+----+-------+-------+-------+
|      date|year|snap_CA|snap_TX|snap_WI|
+----------+----+-------+-------+-------+
|2011-01-29|2011|      0|      0|      0|
|2011-01-30|2011|      0|      0|      0|
|2011-01-31|2011|      0|      0|      0|
|2011-02-01|2011|      1|      1|      0|
|2011-02-02|2011|      1|      0|      1|
|2011-02-03|2011|      1|      1|      1|
|2011-02-04|2011|      1|      0|      0|
|2011-02-05|2011|      1|      1|      1|
|2011-02-06|2011|      1|      1|      1|
|2011-02-07|2011|      1|      1|      0|
|2011-02-08|2011|      1|      0|      1|
|2011-02-09|2011|      1|      1|      1|
|2011-02-10|2011|      1|      0|      0|
|2011-02-11|2011|      0|      1|      1|
|2011-02-12|2011|      0|      1|      1|
|2011-02-13|2011|      0|      1|      0|
|2011-02-14|2011|      0|      0|      1|
|2011-02-15|2011|      0|      1|      1|
|2011-02-16|2011|      0|      0|      0|
|2011-02-17|2011|      0|      0|      0|
+----------+----+-------+-------+-------+
only showing top 20 rows

In [13]:
sdf.select(sdf['year'], sdf['month']>11).show()
+----+------------+
|year|(month > 11)|
+----+------------+
|2011|       false|
|2011|       false|
|2011|       false|
|2011|       false|
|2011|       false|
|2011|       false|
|2011|       false|
|2011|       false|
|2011|       false|
|2011|       false|
|2011|       false|
|2011|       false|
|2011|       false|
|2011|       false|
|2011|       false|
|2011|       false|
|2011|       false|
|2011|       false|
|2011|       false|
|2011|       false|
+----+------------+
only showing top 20 rows

In [14]:
# group data with respect to some columns 
sdf.groupBy(["month","weekday"]).count().show() 
+-----+---------+-----+
|month|  weekday|count|
+-----+---------+-----+
|    3|   Monday|   26|
|   11| Saturday|   22|
|   10| Saturday|   22|
|    4|Wednesday|   26|
|    1|   Monday|   22|
|    2|   Friday|   24|
|   11|   Sunday|   22|
|    4|   Sunday|   25|
|    8|   Sunday|   22|
|    6|   Monday|   24|
|    9|   Monday|   22|
|    4|   Friday|   26|
|    1|   Friday|   23|
|    7|   Monday|   22|
|    4|  Tuesday|   26|
|    6| Thursday|   24|
|    1|Wednesday|   22|
|    4|   Monday|   26|
|    3|  Tuesday|   27|
|    3|   Friday|   26|
+-----+---------+-----+
only showing top 20 rows

In [15]:
sdf.groupBy(["month","weekday"]).count().toPandas()
Out[15]:
month weekday count
0 3 Monday 26
1 11 Saturday 22
2 10 Saturday 22
3 4 Wednesday 26
4 1 Monday 22
... ... ... ...
79 5 Thursday 27
80 2 Wednesday 25
81 1 Thursday 23
82 2 Monday 25
83 7 Wednesday 23

84 rows × 3 columns

In [17]:
## Group and sort
event1count=sdf.groupBy("event_type_1").count()
event1count.sort("count", ascending=False).show()
+------------+-----+
|event_type_1|count|
+------------+-----+
|        NULL| 1807|
|   Religious|   55|
|    National|   52|
|    Cultural|   37|
|    Sporting|   18|
+------------+-----+

Data cleaning¶

In [18]:
## Returns a new DataFrame containing the distinct rows in this DataFrame.
## Takes a while to compute
sdf.distinct().count()
Out[18]:
1969
In [19]:
## Returns a new DataFrame omitting rows with null values
sdf_without_na = sdf.na.drop()
sdf_without_na.show()
+----------+--------+-------+----+-----+----+------+--------------+------------+--------------+------------+-------+-------+-------+
|      date|wm_yr_wk|weekday|wday|month|year|     d|  event_name_1|event_type_1|  event_name_2|event_type_2|snap_CA|snap_TX|snap_WI|
+----------+--------+-------+----+-----+----+------+--------------+------------+--------------+------------+-------+-------+-------+
|2011-04-24|   11113| Sunday|   2|    4|2011|  d_86|OrthodoxEaster|   Religious|        Easter|    Cultural|      0|      0|      0|
|2013-05-05|   11315| Sunday|   2|    5|2013| d_828|OrthodoxEaster|   Religious| Cinco De Mayo|    Cultural|      1|      1|      1|
|2014-04-20|   11412| Sunday|   2|    4|2014|d_1178|        Easter|    Cultural|OrthodoxEaster|   Religious|      0|      0|      0|
|2014-06-15|   11420| Sunday|   2|    6|2014|d_1234|  NBAFinalsEnd|    Sporting|  Father's day|    Cultural|      0|      1|      1|
|2016-06-19|   11621| Sunday|   2|    6|2016|d_1969|  NBAFinalsEnd|    Sporting|  Father's day|    Cultural|      0|      0|      0|
+----------+--------+-------+----+-----+----+------+--------------+------------+--------------+------------+-------+-------+-------+

In [20]:
sdf.count() # original file size
Out[20]:
1969
In [21]:
## Replace null values
sdf.na.fill("unknown").show()
+----------+--------+---------+----+-----+----+----+-------------+------------+------------+------------+-------+-------+-------+
|      date|wm_yr_wk|  weekday|wday|month|year|   d| event_name_1|event_type_1|event_name_2|event_type_2|snap_CA|snap_TX|snap_WI|
+----------+--------+---------+----+-----+----+----+-------------+------------+------------+------------+-------+-------+-------+
|2011-01-29|   11101| Saturday|   1|    1|2011| d_1|      unknown|     unknown|     unknown|     unknown|      0|      0|      0|
|2011-01-30|   11101|   Sunday|   2|    1|2011| d_2|      unknown|     unknown|     unknown|     unknown|      0|      0|      0|
|2011-01-31|   11101|   Monday|   3|    1|2011| d_3|      unknown|     unknown|     unknown|     unknown|      0|      0|      0|
|2011-02-01|   11101|  Tuesday|   4|    2|2011| d_4|      unknown|     unknown|     unknown|     unknown|      1|      1|      0|
|2011-02-02|   11101|Wednesday|   5|    2|2011| d_5|      unknown|     unknown|     unknown|     unknown|      1|      0|      1|
|2011-02-03|   11101| Thursday|   6|    2|2011| d_6|      unknown|     unknown|     unknown|     unknown|      1|      1|      1|
|2011-02-04|   11101|   Friday|   7|    2|2011| d_7|      unknown|     unknown|     unknown|     unknown|      1|      0|      0|
|2011-02-05|   11102| Saturday|   1|    2|2011| d_8|      unknown|     unknown|     unknown|     unknown|      1|      1|      1|
|2011-02-06|   11102|   Sunday|   2|    2|2011| d_9|    SuperBowl|    Sporting|     unknown|     unknown|      1|      1|      1|
|2011-02-07|   11102|   Monday|   3|    2|2011|d_10|      unknown|     unknown|     unknown|     unknown|      1|      1|      0|
|2011-02-08|   11102|  Tuesday|   4|    2|2011|d_11|      unknown|     unknown|     unknown|     unknown|      1|      0|      1|
|2011-02-09|   11102|Wednesday|   5|    2|2011|d_12|      unknown|     unknown|     unknown|     unknown|      1|      1|      1|
|2011-02-10|   11102| Thursday|   6|    2|2011|d_13|      unknown|     unknown|     unknown|     unknown|      1|      0|      0|
|2011-02-11|   11102|   Friday|   7|    2|2011|d_14|      unknown|     unknown|     unknown|     unknown|      0|      1|      1|
|2011-02-12|   11103| Saturday|   1|    2|2011|d_15|      unknown|     unknown|     unknown|     unknown|      0|      1|      1|
|2011-02-13|   11103|   Sunday|   2|    2|2011|d_16|      unknown|     unknown|     unknown|     unknown|      0|      1|      0|
|2011-02-14|   11103|   Monday|   3|    2|2011|d_17|ValentinesDay|    Cultural|     unknown|     unknown|      0|      0|      1|
|2011-02-15|   11103|  Tuesday|   4|    2|2011|d_18|      unknown|     unknown|     unknown|     unknown|      0|      1|      1|
|2011-02-16|   11103|Wednesday|   5|    2|2011|d_19|      unknown|     unknown|     unknown|     unknown|      0|      0|      0|
|2011-02-17|   11103| Thursday|   6|    2|2011|d_20|      unknown|     unknown|     unknown|     unknown|      0|      0|      0|
+----------+--------+---------+----+-----+----+----+-------------+------------+------------+------------+-------+-------+-------+
only showing top 20 rows

User-defined functions¶

In [31]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define a simple UDF to convert strings to uppercase
def to_uppercase(s):
    return s.upper() if s else None

# Register UDF
uppercase_udf = udf(to_uppercase, StringType())

# Apply UDF to the 'weekday' column
df_transformed = sdf.withColumn("weekday_upper", uppercase_udf(sdf["weekday"]))
In [30]:
df_transformed.select("weekday", "weekday_upper").show() # Show result
+---------+-------------+
|  weekday|weekday_upper|
+---------+-------------+
| Saturday|     SATURDAY|
|   Sunday|       SUNDAY|
|   Monday|       MONDAY|
|  Tuesday|      TUESDAY|
|Wednesday|    WEDNESDAY|
| Thursday|     THURSDAY|
|   Friday|       FRIDAY|
| Saturday|     SATURDAY|
|   Sunday|       SUNDAY|
|   Monday|       MONDAY|
|  Tuesday|      TUESDAY|
|Wednesday|    WEDNESDAY|
| Thursday|     THURSDAY|
|   Friday|       FRIDAY|
| Saturday|     SATURDAY|
|   Sunday|       SUNDAY|
|   Monday|       MONDAY|
|  Tuesday|      TUESDAY|
|Wednesday|    WEDNESDAY|
| Thursday|     THURSDAY|
+---------+-------------+
only showing top 20 rows