Working with Spark DataFrame¶
Feng Li¶
Guanghua School of Management¶
Peking University¶
feng.li@gsm.pku.edu.cn¶
Course home page: https://feng.li/bdcf¶
Start a Spark Session¶
In [6]:
import os, sys # Ensure All environment variables are properly set
# os.environ["JAVA_HOME"] = os.path.dirname(sys.executable)
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable
from pyspark.sql import SparkSession # build Spark Session
spark = SparkSession.builder\
.config("spark.ui.enabled", "false") \
.config("spark.executor.memory", "2g")\
.config("spark.cores.max", "2")\
.appName("Spark DataFrame").getOrCreate() # using spark server
Note:
If you have trouble starting a PySpark interactive session due to a system limitation, make sure you have disabled Spark UI
.config("spark.ui.enabled", "false")
You could also submit your Spark Job via the
spark-submit
command in PKU HPC server (more details later).export JAVA_HOME=/nfs-share/software/anaconda/2020.02/envs/python3.12/bin export PYSPARK_PYTHON=/nfs-share/software/anaconda/2020.02/envs/python3.12/bin/python export PYSPARK_DRIVER_PYTHON=$PYSPARK_PYTHON /nfs-share/software/anaconda/2020.02/envs/python3.12/bin/spark-submit \ --conf spark.ui.enabled=false \ your-pyspark-code.py
In [7]:
sdf = spark.read.csv("../data/m5-forecasting-accuracy/calendar.csv", header=True, inferSchema=True) # read files
sdf.show() # Displays the content of the DataFrame to stdout
+----------+--------+---------+----+-----+----+----+-------------+------------+------------+------------+-------+-------+-------+ | date|wm_yr_wk| weekday|wday|month|year| d| event_name_1|event_type_1|event_name_2|event_type_2|snap_CA|snap_TX|snap_WI| +----------+--------+---------+----+-----+----+----+-------------+------------+------------+------------+-------+-------+-------+ |2011-01-29| 11101| Saturday| 1| 1|2011| d_1| NULL| NULL| NULL| NULL| 0| 0| 0| |2011-01-30| 11101| Sunday| 2| 1|2011| d_2| NULL| NULL| NULL| NULL| 0| 0| 0| |2011-01-31| 11101| Monday| 3| 1|2011| d_3| NULL| NULL| NULL| NULL| 0| 0| 0| |2011-02-01| 11101| Tuesday| 4| 2|2011| d_4| NULL| NULL| NULL| NULL| 1| 1| 0| |2011-02-02| 11101|Wednesday| 5| 2|2011| d_5| NULL| NULL| NULL| NULL| 1| 0| 1| |2011-02-03| 11101| Thursday| 6| 2|2011| d_6| NULL| NULL| NULL| NULL| 1| 1| 1| |2011-02-04| 11101| Friday| 7| 2|2011| d_7| NULL| NULL| NULL| NULL| 1| 0| 0| |2011-02-05| 11102| Saturday| 1| 2|2011| d_8| NULL| NULL| NULL| NULL| 1| 1| 1| |2011-02-06| 11102| Sunday| 2| 2|2011| d_9| SuperBowl| Sporting| NULL| NULL| 1| 1| 1| |2011-02-07| 11102| Monday| 3| 2|2011|d_10| NULL| NULL| NULL| NULL| 1| 1| 0| |2011-02-08| 11102| Tuesday| 4| 2|2011|d_11| NULL| NULL| NULL| NULL| 1| 0| 1| |2011-02-09| 11102|Wednesday| 5| 2|2011|d_12| NULL| NULL| NULL| NULL| 1| 1| 1| |2011-02-10| 11102| Thursday| 6| 2|2011|d_13| NULL| NULL| NULL| NULL| 1| 0| 0| |2011-02-11| 11102| Friday| 7| 2|2011|d_14| NULL| NULL| NULL| NULL| 0| 1| 1| |2011-02-12| 11103| Saturday| 1| 2|2011|d_15| NULL| NULL| NULL| NULL| 0| 1| 1| |2011-02-13| 11103| Sunday| 2| 2|2011|d_16| NULL| NULL| NULL| NULL| 0| 1| 0| |2011-02-14| 11103| Monday| 3| 2|2011|d_17|ValentinesDay| Cultural| NULL| NULL| 0| 0| 1| |2011-02-15| 11103| Tuesday| 4| 2|2011|d_18| NULL| NULL| NULL| NULL| 0| 1| 1| |2011-02-16| 11103|Wednesday| 5| 2|2011|d_19| NULL| NULL| NULL| NULL| 0| 0| 0| |2011-02-17| 11103| Thursday| 6| 2|2011|d_20| NULL| NULL| NULL| NULL| 0| 0| 0| +----------+--------+---------+----+-----+----+----+-------------+------------+------------+------------+-------+-------+-------+ only showing top 20 rows
Descriptive Statistics¶
In [8]:
sdf.describe().show() # ugly description
+-------+----------+------------------+---------+------------------+-----------------+------------------+-----+------------+------------+--------------+------------+-------------------+-------------------+-------------------+ |summary| date| wm_yr_wk| weekday| wday| month| year| d|event_name_1|event_type_1| event_name_2|event_type_2| snap_CA| snap_TX| snap_WI| +-------+----------+------------------+---------+------------------+-----------------+------------------+-----+------------+------------+--------------+------------+-------------------+-------------------+-------------------+ | count| 1969| 1969| 1969| 1969| 1969| 1969| 1969| 162| 162| 5| 5| 1969| 1969| 1969| | mean| NULL|11347.086338242763| NULL|3.9974606399187405|6.325545962417471| 2013.288471305231| NULL| NULL| NULL| NULL| NULL|0.33011681056373793|0.33011681056373793|0.33011681056373793| | stddev| NULL| 155.2770428028507| NULL|2.0011413541040746|3.416864338775945|1.5801982706329631| NULL| NULL| NULL| NULL| NULL|0.47037439309734164|0.47037439309734164|0.47037439309734164| | min|2011-01-29| 11101| Friday| 1| 1| 2011| d_1|Chanukah End| Cultural| Cinco De Mayo| Cultural| 0| 0| 0| | max|2016-06-19| 11621|Wednesday| 7| 9| 2016|d_999| VeteransDay| Sporting|OrthodoxEaster| Religious| 1| 1| 1| +-------+----------+------------------+---------+------------------+-----------------+------------------+-----+------------+------------+--------------+------------+-------------------+-------------------+-------------------+
In [9]:
sdf.describe().toPandas() # pretty
Out[9]:
summary | date | wm_yr_wk | weekday | wday | month | year | d | event_name_1 | event_type_1 | event_name_2 | event_type_2 | snap_CA | snap_TX | snap_WI | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | count | 1969 | 1969 | 1969 | 1969 | 1969 | 1969 | 1969 | 162 | 162 | 5 | 5 | 1969 | 1969 | 1969 |
1 | mean | None | 11347.086338242763 | None | 3.9974606399187405 | 6.325545962417471 | 2013.288471305231 | None | None | None | None | None | 0.33011681056373793 | 0.33011681056373793 | 0.33011681056373793 |
2 | stddev | None | 155.2770428028507 | None | 2.0011413541040746 | 3.416864338775945 | 1.5801982706329631 | None | None | None | None | None | 0.47037439309734164 | 0.47037439309734164 | 0.47037439309734164 |
3 | min | 2011-01-29 | 11101 | Friday | 1 | 1 | 2011 | d_1 | Chanukah End | Cultural | Cinco De Mayo | Cultural | 0 | 0 | 0 |
4 | max | 2016-06-19 | 11621 | Wednesday | 7 | 9 | 2016 | d_999 | VeteransDay | Sporting | OrthodoxEaster | Religious | 1 | 1 | 1 |
In [6]:
sdf.describe(['year']).show()
+-------+------------------+ |summary| year| +-------+------------------+ | count| 1969| | mean| 2013.288471305231| | stddev|1.5801982706329631| | min| 2011| | max| 2016| +-------+------------------+
Print the schema in a tree format¶
In [10]:
sdf.printSchema()
root |-- date: string (nullable = true) |-- wm_yr_wk: string (nullable = true) |-- weekday: string (nullable = true) |-- wday: string (nullable = true) |-- month: string (nullable = true) |-- year: string (nullable = true) |-- d: string (nullable = true) |-- event_name_1: string (nullable = true) |-- event_type_1: string (nullable = true) |-- event_name_2: string (nullable = true) |-- event_type_2: string (nullable = true) |-- snap_CA: string (nullable = true) |-- snap_TX: string (nullable = true) |-- snap_WI: string (nullable = true)
Select columns¶
In [12]:
sdf.select(["date","year","snap_CA", "snap_TX", "snap_WI"]).show()
+----------+----+-------+-------+-------+ | date|year|snap_CA|snap_TX|snap_WI| +----------+----+-------+-------+-------+ |2011-01-29|2011| 0| 0| 0| |2011-01-30|2011| 0| 0| 0| |2011-01-31|2011| 0| 0| 0| |2011-02-01|2011| 1| 1| 0| |2011-02-02|2011| 1| 0| 1| |2011-02-03|2011| 1| 1| 1| |2011-02-04|2011| 1| 0| 0| |2011-02-05|2011| 1| 1| 1| |2011-02-06|2011| 1| 1| 1| |2011-02-07|2011| 1| 1| 0| |2011-02-08|2011| 1| 0| 1| |2011-02-09|2011| 1| 1| 1| |2011-02-10|2011| 1| 0| 0| |2011-02-11|2011| 0| 1| 1| |2011-02-12|2011| 0| 1| 1| |2011-02-13|2011| 0| 1| 0| |2011-02-14|2011| 0| 0| 1| |2011-02-15|2011| 0| 1| 1| |2011-02-16|2011| 0| 0| 0| |2011-02-17|2011| 0| 0| 0| +----------+----+-------+-------+-------+ only showing top 20 rows
In [13]:
sdf.select(sdf['year'], sdf['month']>11).show()
+----+------------+ |year|(month > 11)| +----+------------+ |2011| false| |2011| false| |2011| false| |2011| false| |2011| false| |2011| false| |2011| false| |2011| false| |2011| false| |2011| false| |2011| false| |2011| false| |2011| false| |2011| false| |2011| false| |2011| false| |2011| false| |2011| false| |2011| false| |2011| false| +----+------------+ only showing top 20 rows
In [14]:
# group data with respect to some columns
sdf.groupBy(["month","weekday"]).count().show()
+-----+---------+-----+ |month| weekday|count| +-----+---------+-----+ | 3| Monday| 26| | 11| Saturday| 22| | 10| Saturday| 22| | 4|Wednesday| 26| | 1| Monday| 22| | 2| Friday| 24| | 11| Sunday| 22| | 4| Sunday| 25| | 8| Sunday| 22| | 6| Monday| 24| | 9| Monday| 22| | 4| Friday| 26| | 1| Friday| 23| | 7| Monday| 22| | 4| Tuesday| 26| | 6| Thursday| 24| | 1|Wednesday| 22| | 4| Monday| 26| | 3| Tuesday| 27| | 3| Friday| 26| +-----+---------+-----+ only showing top 20 rows
In [15]:
sdf.groupBy(["month","weekday"]).count().toPandas()
Out[15]:
month | weekday | count | |
---|---|---|---|
0 | 3 | Monday | 26 |
1 | 11 | Saturday | 22 |
2 | 10 | Saturday | 22 |
3 | 4 | Wednesday | 26 |
4 | 1 | Monday | 22 |
... | ... | ... | ... |
79 | 5 | Thursday | 27 |
80 | 2 | Wednesday | 25 |
81 | 1 | Thursday | 23 |
82 | 2 | Monday | 25 |
83 | 7 | Wednesday | 23 |
84 rows × 3 columns
In [17]:
## Group and sort
event1count=sdf.groupBy("event_type_1").count()
event1count.sort("count", ascending=False).show()
+------------+-----+ |event_type_1|count| +------------+-----+ | NULL| 1807| | Religious| 55| | National| 52| | Cultural| 37| | Sporting| 18| +------------+-----+
Data cleaning¶
In [18]:
## Returns a new DataFrame containing the distinct rows in this DataFrame.
## Takes a while to compute
sdf.distinct().count()
Out[18]:
1969
In [19]:
## Returns a new DataFrame omitting rows with null values
sdf_without_na = sdf.na.drop()
sdf_without_na.show()
+----------+--------+-------+----+-----+----+------+--------------+------------+--------------+------------+-------+-------+-------+ | date|wm_yr_wk|weekday|wday|month|year| d| event_name_1|event_type_1| event_name_2|event_type_2|snap_CA|snap_TX|snap_WI| +----------+--------+-------+----+-----+----+------+--------------+------------+--------------+------------+-------+-------+-------+ |2011-04-24| 11113| Sunday| 2| 4|2011| d_86|OrthodoxEaster| Religious| Easter| Cultural| 0| 0| 0| |2013-05-05| 11315| Sunday| 2| 5|2013| d_828|OrthodoxEaster| Religious| Cinco De Mayo| Cultural| 1| 1| 1| |2014-04-20| 11412| Sunday| 2| 4|2014|d_1178| Easter| Cultural|OrthodoxEaster| Religious| 0| 0| 0| |2014-06-15| 11420| Sunday| 2| 6|2014|d_1234| NBAFinalsEnd| Sporting| Father's day| Cultural| 0| 1| 1| |2016-06-19| 11621| Sunday| 2| 6|2016|d_1969| NBAFinalsEnd| Sporting| Father's day| Cultural| 0| 0| 0| +----------+--------+-------+----+-----+----+------+--------------+------------+--------------+------------+-------+-------+-------+
In [20]:
sdf.count() # original file size
Out[20]:
1969
In [21]:
## Replace null values
sdf.na.fill("unknown").show()
+----------+--------+---------+----+-----+----+----+-------------+------------+------------+------------+-------+-------+-------+ | date|wm_yr_wk| weekday|wday|month|year| d| event_name_1|event_type_1|event_name_2|event_type_2|snap_CA|snap_TX|snap_WI| +----------+--------+---------+----+-----+----+----+-------------+------------+------------+------------+-------+-------+-------+ |2011-01-29| 11101| Saturday| 1| 1|2011| d_1| unknown| unknown| unknown| unknown| 0| 0| 0| |2011-01-30| 11101| Sunday| 2| 1|2011| d_2| unknown| unknown| unknown| unknown| 0| 0| 0| |2011-01-31| 11101| Monday| 3| 1|2011| d_3| unknown| unknown| unknown| unknown| 0| 0| 0| |2011-02-01| 11101| Tuesday| 4| 2|2011| d_4| unknown| unknown| unknown| unknown| 1| 1| 0| |2011-02-02| 11101|Wednesday| 5| 2|2011| d_5| unknown| unknown| unknown| unknown| 1| 0| 1| |2011-02-03| 11101| Thursday| 6| 2|2011| d_6| unknown| unknown| unknown| unknown| 1| 1| 1| |2011-02-04| 11101| Friday| 7| 2|2011| d_7| unknown| unknown| unknown| unknown| 1| 0| 0| |2011-02-05| 11102| Saturday| 1| 2|2011| d_8| unknown| unknown| unknown| unknown| 1| 1| 1| |2011-02-06| 11102| Sunday| 2| 2|2011| d_9| SuperBowl| Sporting| unknown| unknown| 1| 1| 1| |2011-02-07| 11102| Monday| 3| 2|2011|d_10| unknown| unknown| unknown| unknown| 1| 1| 0| |2011-02-08| 11102| Tuesday| 4| 2|2011|d_11| unknown| unknown| unknown| unknown| 1| 0| 1| |2011-02-09| 11102|Wednesday| 5| 2|2011|d_12| unknown| unknown| unknown| unknown| 1| 1| 1| |2011-02-10| 11102| Thursday| 6| 2|2011|d_13| unknown| unknown| unknown| unknown| 1| 0| 0| |2011-02-11| 11102| Friday| 7| 2|2011|d_14| unknown| unknown| unknown| unknown| 0| 1| 1| |2011-02-12| 11103| Saturday| 1| 2|2011|d_15| unknown| unknown| unknown| unknown| 0| 1| 1| |2011-02-13| 11103| Sunday| 2| 2|2011|d_16| unknown| unknown| unknown| unknown| 0| 1| 0| |2011-02-14| 11103| Monday| 3| 2|2011|d_17|ValentinesDay| Cultural| unknown| unknown| 0| 0| 1| |2011-02-15| 11103| Tuesday| 4| 2|2011|d_18| unknown| unknown| unknown| unknown| 0| 1| 1| |2011-02-16| 11103|Wednesday| 5| 2|2011|d_19| unknown| unknown| unknown| unknown| 0| 0| 0| |2011-02-17| 11103| Thursday| 6| 2|2011|d_20| unknown| unknown| unknown| unknown| 0| 0| 0| +----------+--------+---------+----+-----+----+----+-------------+------------+------------+------------+-------+-------+-------+ only showing top 20 rows
User-defined functions¶
In [31]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Define a simple UDF to convert strings to uppercase
def to_uppercase(s):
return s.upper() if s else None
# Register UDF
uppercase_udf = udf(to_uppercase, StringType())
# Apply UDF to the 'weekday' column
df_transformed = sdf.withColumn("weekday_upper", uppercase_udf(sdf["weekday"]))
In [30]:
df_transformed.select("weekday", "weekday_upper").show() # Show result
+---------+-------------+ | weekday|weekday_upper| +---------+-------------+ | Saturday| SATURDAY| | Sunday| SUNDAY| | Monday| MONDAY| | Tuesday| TUESDAY| |Wednesday| WEDNESDAY| | Thursday| THURSDAY| | Friday| FRIDAY| | Saturday| SATURDAY| | Sunday| SUNDAY| | Monday| MONDAY| | Tuesday| TUESDAY| |Wednesday| WEDNESDAY| | Thursday| THURSDAY| | Friday| FRIDAY| | Saturday| SATURDAY| | Sunday| SUNDAY| | Monday| MONDAY| | Tuesday| TUESDAY| |Wednesday| WEDNESDAY| | Thursday| THURSDAY| +---------+-------------+ only showing top 20 rows