# Working with Spark DataFrame

## Feng Li

### Central University of Finance and Economics

### [feng.li@cufe.edu.cn](feng.li@cufe.edu.cn)
### Course home page: [https://feng.li/distcomp](https://feng.li/distcomp)

## Start a Spark Session

In [11]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark with DataFrame").getOrCreate() # .master("local")
spark

**Note:**

If you have problem to start pyspark interactive session due to system limitation. You could submit your spakr Job via the `spark-submit` command as below.


```
PYSPARK-PYTHON=python3.7 spark-submit \
    --conf spark.ui.enabled=false     \
    your-pyspark-code.py         
```

## Read file and infer the schema from the header

In [12]:
## Load a local file becasue we are using spark's local mode
air0 = spark.read.options(header='true', inferSchema='true').csv("/data/airdelay/airdelay_small.csv") 

In [13]:
air0 # the schema is not correct for some variables

DataFrame[Year: int, Month: int, DayofMonth: int, DayOfWeek: int, DepTime: string, CRSDepTime: int, ArrTime: string, CRSArrTime: int, UniqueCarrier: string, FlightNum: int, TailNum: string, ActualElapsedTime: string, CRSElapsedTime: string, AirTime: string, ArrDelay: string, DepDelay: string, Origin: string, Dest: string, Distance: string, TaxiIn: string, TaxiOut: string, Cancelled: int, CancellationCode: string, Diverted: int, CarrierDelay: string, WeatherDelay: string, NASDelay: string, SecurityDelay: string, LateAircraftDelay: string]

In [14]:
# We specify the correct schema by hand
from pyspark.sql.types import *
schema_sdf = StructType([
        StructField('Year', IntegerType(), True),
        StructField('Month', IntegerType(), True),
        StructField('DayofMonth', IntegerType(), True),
        StructField('DayOfWeek', IntegerType(), True),
        StructField('DepTime', DoubleType(), True),
        StructField('CRSDepTime', DoubleType(), True),
        StructField('ArrTime', DoubleType(), True),
        StructField('CRSArrTime', DoubleType(), True),
        StructField('UniqueCarrier', StringType(), True),
        StructField('FlightNum', StringType(), True),
        StructField('TailNum', StringType(), True),
        StructField('ActualElapsedTime', DoubleType(), True),
        StructField('CRSElapsedTime',  DoubleType(), True),
        StructField('AirTime',  DoubleType(), True),
        StructField('ArrDelay',  DoubleType(), True),
        StructField('DepDelay',  DoubleType(), True),
        StructField('Origin', StringType(), True),
        StructField('Dest',  StringType(), True),
        StructField('Distance',  DoubleType(), True),
        StructField('TaxiIn',  DoubleType(), True),
        StructField('TaxiOut',  DoubleType(), True),
        StructField('Cancelled',  IntegerType(), True),
        StructField('CancellationCode',  StringType(), True),
        StructField('Diverted',  IntegerType(), True),
        StructField('CarrierDelay', DoubleType(), True),
        StructField('WeatherDelay',  DoubleType(), True),
        StructField('NASDelay',  DoubleType(), True),
        StructField('SecurityDelay',  DoubleType(), True),
        StructField('LateAircraftDelay',  DoubleType(), True)
    ]) 

In [None]:
air = spark.read.options(header='true').schema(schema_sdf).csv("/data/airdelay/airdelay_small.csv")
air

In [15]:
air

DataFrame[Year: int, Month: int, DayofMonth: int, DayOfWeek: int, DepTime: double, CRSDepTime: double, ArrTime: double, CRSArrTime: double, UniqueCarrier: string, FlightNum: string, TailNum: string, ActualElapsedTime: double, CRSElapsedTime: double, AirTime: double, ArrDelay: double, DepDelay: double, Origin: string, Dest: string, Distance: double, TaxiIn: double, TaxiOut: double, Cancelled: int, CancellationCode: string, Diverted: int, CarrierDelay: double, WeatherDelay: double, NASDelay: double, SecurityDelay: double, LateAircraftDelay: double]

## Descriptive  Statistics

In [7]:
air.describe().show()

+-------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+-------------+------------------+-------+------------------+------------------+------------------+-----------------+------------------+-------+-------+-----------------+------------------+------------------+---------+----------------+--------+------------------+------------------+------------------+--------------------+------------------+
|summary|              Year|             Month|        DayofMonth|         DayOfWeek|           DepTime|       CRSDepTime|           ArrTime|        CRSArrTime|UniqueCarrier|         FlightNum|TailNum| ActualElapsedTime|    CRSElapsedTime|           AirTime|         ArrDelay|          DepDelay| Origin|   Dest|         Distance|            TaxiIn|           TaxiOut|Cancelled|CancellationCode|Diverted|      CarrierDelay|      WeatherDelay|          NASDelay|       SecurityDelay| LateAircraftDelay

In [14]:
air.describe(['ArrDelay']).show()

+-------+------------------+
|summary|          ArrDelay|
+-------+------------------+
|  count|           5548754|
|   mean|  6.97897995898367|
| stddev|30.191156753519472|
|    min|                -1|
|    max|                NA|
+-------+------------------+



### Print the schema in a tree format

In [13]:
air.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Carr

### Select columns

In [18]:
air.select(["ArrDelay","AirTime","Distance"]).show()

+--------+-------+--------+
|ArrDelay|AirTime|Distance|
+--------+-------+--------+
|       2|     25|     127|
|      29|    248|    1623|
|       8|     NA|     622|
|      -2|     70|     451|
|      11|    133|    1009|
|      13|    177|    1562|
|     -12|    181|    1589|
|      11|    364|    2611|
|      13|     53|     304|
|       9|     NA|     888|
|      -8|    293|    2537|
|      15|     NA|    1723|
|     -14|     NA|    1736|
|      55|    285|    1927|
|      23|    149|     991|
|      64|     35|     193|
|      29|     25|      77|
|      -8|     NA|     447|
|      -6|     91|     678|
|      35|    127|     998|
+--------+-------+--------+
only showing top 20 rows



In [19]:
air.select(air['UniqueCarrier'], air['ArrDelay']>0).show()

+-------------+--------------+
|UniqueCarrier|(ArrDelay > 0)|
+-------------+--------------+
|           XE|          true|
|           CO|          true|
|           AA|          true|
|           WN|         false|
|           CO|          true|
|           AA|          true|
|           DL|         false|
|           AA|          true|
|           US|          true|
|           AA|          true|
|           AS|         false|
|           UA|          true|
|           TW|         false|
|           NW|          true|
|           NW|          true|
|           AA|          true|
|           DH|          true|
|           WN|         false|
|           AA|         false|
|           CO|          true|
+-------------+--------------+
only showing top 20 rows



In [32]:
# group data with respect to some columns 
air.groupBy(["UniqueCarrier","DayOfWeek"]).count().show() 

+-------------+---------+------+
|UniqueCarrier|DayOfWeek| count|
+-------------+---------+------+
|           PS|        6|   406|
|           CO|        4| 55764|
|       ML (1)|        7|   442|
|           XE|        4| 14896|
|           TZ|        4|  1455|
|           OO|        3| 17310|
|           EA|        7|  6197|
|           OO|        4| 17666|
|           F9|        2|  1679|
|           EA|        5|  6295|
|           HA|        5|  1519|
|           UA|        4| 89272|
|           EV|        4|  9729|
|           DL|        6|106031|
|           FL|        5|  6962|
|           YV|        3|  4165|
|           AQ|        2|  1035|
|       ML (1)|        2|   502|
|           DL|        3|110827|
|           YV|        6|  3828|
+-------------+---------+------+
only showing top 20 rows



In [31]:
## Group and sort
aircount=air.groupBy("UniqueCarrier").count()
aircount.sort("count", ascending=False).show()

+-------------+------+
|UniqueCarrier| count|
+-------------+------+
|           DL|765388|
|           WN|703368|
|           AA|684522|
|           US|649056|
|           UA|611957|
|           NW|473820|
|           CO|373858|
|           TW|179081|
|           HP|173509|
|           MQ|164790|
|           AS|129863|
|           OO|120223|
|           XE| 94311|
|           EV| 67148|
|           OH| 60630|
|           FL| 47540|
|           EA| 43723|
|           PI| 41489|
|           DH| 32900|
|           B6| 29111|
+-------------+------+
only showing top 20 rows



### Data cleaning

In [None]:
## Returns a new DataFrame containing the distinct rows in this DataFrame.
## Takes a while to compute

## air.distinct().count()

In [6]:
## Returns a new DataFrame omitting rows with null values
air_without_na = air.na.drop()
air_without_na.show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|1997|    3|         2|        7|   2019|      2015|   2314|      2245|           CO|      143

In [8]:
air_without_na.count()

4018338

In [9]:
air.count() # original file size

5548754

In [12]:
## Replace null values
air.na.fill("unknown").show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2006|    7|         6|        4|   2055|      2055|   2150|      2148|           XE|     2619

In [13]:
air.na.replace('NA', "unknown").show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+-------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance| TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+-------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2006|    7|         6|        4|   2055|      2055|   2150|      2148|           XE|     2

In [8]:
## Reload a local file becasue we changed the NA values
air = spark.read.options(header='true', inferSchema='true').csv("/data/airdelay/airdelay_small.csv") 

In [22]:
air.groupBy().max('DayOfWeek').collect() # mus apply to a numeric column

[Row(max(DayOfWeek)=7)]

## Statistics

In [57]:
air.corr("Distance","ArrDelay")

0.008481756987561134

In [58]:
air.cov("Distance","ArrDelay")

140.5795326021564

In [60]:
air.filter(air.ArrDelay > 60).show() # filter with certain conditions 

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2007|    5|        17|        4| 1944.0|    1830.0| 2034.0|    1930.0|           AA|     1011

### User-defined functions

In [17]:
## air2 = air.select(["DayOfWeek","ArrDelay","AirTime","Distance"])
air2_pdf = air.select(["DayOfWeek", "ArrDelay","AirTime","Distance"]).toPandas()

In [101]:
air2_pdf

Unnamed: 0,DayOfWeek,ArrDelay,AirTime,Distance
0,4.0,2.0,25.0,127.0
1,7.0,29.0,248.0,1623.0
2,,,,
3,5.0,-2.0,70.0,451.0
4,7.0,11.0,133.0,1009.0
5,7.0,13.0,177.0,1562.0
6,1.0,-12.0,181.0,1589.0
7,3.0,11.0,364.0,2611.0
8,5.0,13.0,53.0,304.0
9,,,,


In [16]:
def myfun(pdf):
    out = dict() 
    out["ArrDelay"] = pdf.ArrDelay.mean()
    out["AirTime"]  = pdf.AirTime.mean()
    out["Distance"] = pdf.Distance.mean()
     
    return pd.DataFrame(out, index=[0])

myfun(air2_pdf)

NameError: name 'air2_pdf' is not defined

In [119]:
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf("DayOfWeek long, ArrDelay long, AirTime long, Distance long", PandasUDFType .GROUPED_MAP)  
def myfun(pdf):
    out = dict() 
    out["ArrDelay"] = pdf.ArrDelay.mean()
    out["AirTime"]  = pdf.AirTime.mean()
    out["Distance"] = pdf.Distance.mean()
    
    return pd.DataFrame(out, index=[0])

In [108]:
air2 = air.select(["DayOfWeek","ArrDelay","AirTime","Distance"])

In [None]:
# air3 = air2.na.drop()
# air3.groupby("DayOfWeek").apply(myfun).show()