Feature Computation using Spark¶

Feng Li¶

Guanghua School of Management¶

Peking University¶

feng.li@gsm.pku.edu.cn¶

Course home page: https://feng.li/bdcf¶

In [1]:
import os, sys # Ensure All environment variables are properly set 
# os.environ["JAVA_HOME"] = os.path.dirname(sys.executable)
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

from pyspark.sql import SparkSession # build Spark Session
spark = SparkSession.builder\
        .config("spark.ui.enabled", "false")  \
        .config("spark.executor.memory", "4g")\
        .config("spark.sql.shuffle.partitions", 400)\
        .config("spark.driver.memory", "20g")\
        .config("spark.executor.cores", 8)\
        .appName("Spark DataFrame").getOrCreate() # using spark server
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/04 19:13:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
In [2]:
TARGET = 'sales'          # Our main target
END_TRAIN = 1913+28       # Last day in train set
MAIN_INDEX = ['id','d']   # We can identify item by these columns

# Load data into PySpark DataFrames
train_df = spark.read.csv("../data/m5-forecasting-accuracy/sales_train_evaluation.csv", header=True, inferSchema=True)
prices_df = spark.read.csv("../data/m5-forecasting-accuracy/sell_prices.csv", header=True, inferSchema=True)
calendar_df = spark.read.csv("../data/m5-forecasting-accuracy/calendar.csv", header=True, inferSchema=True)

# Show the schema (optional)
# train_df.printSchema()
# prices_df.printSchema()
                                                                                
In [3]:
from pyspark.sql.functions import col, lit, expr
from pyspark.sql.types import StringType
import numpy as np
# calendar_df.printSchema()

# Define index columns
index_columns = ['id', 'item_id', 'dept_id', 'cat_id', 'store_id', 'state_id']

# Melt train_df using explode
train_df_long = train_df.selectExpr(
    *index_columns, 
    "stack(" + str(len(train_df.columns) - len(index_columns)) + 
    "".join([f", '{col}', {col}" for col in train_df.columns if col not in index_columns]) + ") as (d, sales)"
)

# Convert "d" column format to match Pandas melt
train_df_long = train_df_long.withColumn("d", expr("substring(d, 3, length(d)-2)"))

# Count rows
print("Train rows:", train_df.count(), train_df_long.count())

# Create "test set" grid for future dates
from pyspark.sql.functions import monotonically_increasing_id

add_grid = train_df.select(*index_columns).dropDuplicates()
add_grid = add_grid.crossJoin(
    spark.createDataFrame([(f"d_{END_TRAIN+i}", np.nan) for i in range(1, 29)], ["d", TARGET])
)

# Combine train and test sets
grid_df = train_df_long.union(add_grid)

# Convert string columns to categorical (in PySpark, it means converting to StringType or factorizing via StringIndexer)
for col_name in index_columns:
    grid_df = grid_df.withColumn(col_name, col(col_name).cast(StringType()))

# Show memory usage estimate (PySpark does not have direct memory usage functions)
print(f"Total rows in grid_df: {grid_df.count()}")
25/03/04 19:17:11 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                
Train rows: 30490 59181090
[Stage 15:==========>                                          (27 + 104) / 134]
Total rows in grid_df: 60034810
                                                                                
In [4]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

# Group by store_id and item_id to get the earliest (min) wm_yr_wk (release week)
release_df = prices_df.groupBy("store_id", "item_id").agg(F.min("wm_yr_wk").alias("release"))

# Merge release_df with grid_df
grid_df = grid_df.join(release_df, on=["store_id", "item_id"], how="left")

# Remove release_df to free memory
del release_df

# Merge with calendar_df to get wm_yr_wk column
grid_df = grid_df.join(calendar_df.select("wm_yr_wk", "d"), on="d", how="left")

# Remove rows where wm_yr_wk is earlier than release
grid_df = grid_df.filter(F.col("wm_yr_wk") >= F.col("release"))

# Reset index equivalent (not needed in PySpark, but ensuring ordering)
grid_df = grid_df.withColumn("id", F.monotonically_increasing_id())

# Minify the release values
min_release = grid_df.agg(F.min("release")).collect()[0][0]  # Get minimum release week
grid_df = grid_df.withColumn("release", (F.col("release") - min_release).cast(IntegerType()))

# Show the transformed grid_df schema and a few rows
grid_df.printSchema()
grid_df.show(5)
                                                                                
root
 |-- d: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- id: long (nullable = false)
 |-- dept_id: string (nullable = true)
 |-- cat_id: string (nullable = true)
 |-- state_id: string (nullable = true)
 |-- sales: double (nullable = true)
 |-- release: integer (nullable = true)
 |-- wm_yr_wk: integer (nullable = true)

[Stage 44:==================================================>     (18 + 2) / 20]
+------+--------+---------------+------------+-----------+---------+--------+-----+-------+--------+
|     d|store_id|        item_id|          id|    dept_id|   cat_id|state_id|sales|release|wm_yr_wk|
+------+--------+---------------+------------+-----------+---------+--------+-----+-------+--------+
|d_1942|    CA_1|HOUSEHOLD_1_049|206158430208|HOUSEHOLD_1|HOUSEHOLD|      CA|  NaN|    304|   11617|
|d_1942|    CA_2|  HOBBIES_1_124|206158430209|  HOBBIES_1|  HOBBIES|      CA|  NaN|    150|   11617|
|d_1942|    CA_2|HOUSEHOLD_2_111|206158430210|HOUSEHOLD_2|HOUSEHOLD|      CA|  NaN|      0|   11617|
|d_1942|    CA_2|    FOODS_3_113|206158430211|    FOODS_3|    FOODS|      CA|  NaN|      0|   11617|
|d_1942|    CA_2|    FOODS_3_092|206158430212|    FOODS_3|    FOODS|      CA|  NaN|    118|   11617|
+------+--------+---------------+------------+-----------+---------+--------+-----+-------+--------+
only showing top 5 rows

                                                                                
In [6]:
# Save DataFrame as Parquet (preferred for efficiency)
# grid_df.write.mode("overwrite").parquet("grid_part_1.parquet")
grid_df1 = grid_df
In [5]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Define window partitioning with ORDER BY for sequential computations
store_item_window = Window.partitionBy("store_id", "item_id").orderBy("wm_yr_wk")
store_item_month_window = Window.partitionBy("store_id", "item_id", "month").orderBy("wm_yr_wk")
store_item_year_window = Window.partitionBy("store_id", "item_id", "year").orderBy("wm_yr_wk")

# Compute basic aggregations
prices_df = prices_df.withColumn("price_max", F.max("sell_price").over(Window.partitionBy("store_id", "item_id")))
prices_df = prices_df.withColumn("price_min", F.min("sell_price").over(Window.partitionBy("store_id", "item_id")))
prices_df = prices_df.withColumn("price_std", F.stddev("sell_price").over(Window.partitionBy("store_id", "item_id")))
prices_df = prices_df.withColumn("price_mean", F.mean("sell_price").over(Window.partitionBy("store_id", "item_id")))

# Normalize prices (min-max scaling)
prices_df = prices_df.withColumn("price_norm", F.col("sell_price") / F.col("price_max"))

# Compute distinct counts separately (fix for DISTINCT not allowed in window functions)
price_nunique_df = prices_df.groupBy("store_id", "item_id").agg(F.countDistinct("sell_price").alias("price_nunique"))
item_nunique_df = prices_df.groupBy("store_id", "sell_price").agg(F.countDistinct("item_id").alias("item_nunique"))

# Join distinct count results back to prices_df
prices_df = prices_df.join(price_nunique_df, on=["store_id", "item_id"], how="left")
prices_df = prices_df.join(item_nunique_df, on=["store_id", "sell_price"], how="left")

# Fix: Select only necessary columns from calendar_df to avoid ambiguity
calendar_prices = calendar_df.select(
    F.col("wm_yr_wk"),
    F.col("month").alias("calendar_month"),  # Renaming to avoid ambiguity
    F.col("year").alias("calendar_year")
).dropDuplicates(["wm_yr_wk"])

# Merge calendar information into prices_df
prices_df = prices_df.join(calendar_prices, on=["wm_yr_wk"], how="left")

# Compute price momentum
prices_df = prices_df.withColumn(
    "price_momentum",
    F.col("sell_price") / F.lag("sell_price", 1).over(store_item_window)
)
prices_df = prices_df.withColumn(
    "price_momentum_m",
    F.col("sell_price") / F.mean("sell_price").over(
        Window.partitionBy("store_id", "item_id", "calendar_month").orderBy("wm_yr_wk")
    )
)
prices_df = prices_df.withColumn(
    "price_momentum_y",
    F.col("sell_price") / F.mean("sell_price").over(
        Window.partitionBy("store_id", "item_id", "calendar_year").orderBy("wm_yr_wk")
    )
)

# Drop temporary columns
prices_df = prices_df.drop("calendar_month", "calendar_year")

# Show schema and verify results
prices_df.printSchema()
prices_df.show(5)
root
 |-- wm_yr_wk: integer (nullable = true)
 |-- store_id: string (nullable = true)
 |-- sell_price: double (nullable = true)
 |-- item_id: string (nullable = true)
 |-- price_max: double (nullable = true)
 |-- price_min: double (nullable = true)
 |-- price_std: double (nullable = true)
 |-- price_mean: double (nullable = true)
 |-- price_norm: double (nullable = true)
 |-- price_nunique: long (nullable = true)
 |-- item_nunique: long (nullable = true)
 |-- price_momentum: double (nullable = true)
 |-- price_momentum_m: double (nullable = true)
 |-- price_momentum_y: double (nullable = true)

[Stage 62:>                                                         (0 + 1) / 1]
+--------+--------+----------+-----------+---------+---------+---------+------------------+----------+-------------+------------+--------------+----------------+------------------+
|wm_yr_wk|store_id|sell_price|    item_id|price_max|price_min|price_std|        price_mean|price_norm|price_nunique|item_nunique|price_momentum|price_momentum_m|  price_momentum_y|
+--------+--------+----------+-----------+---------+---------+---------+------------------+----------+-------------+------------+--------------+----------------+------------------+
|   11323|    CA_1|      0.98|FOODS_1_097|     0.98|     0.98|      0.0|0.9799999999999994|       1.0|            1|          68|          NULL|             1.0|               1.0|
|   11324|    CA_1|      0.98|FOODS_1_097|     0.98|     0.98|      0.0|0.9799999999999994|       1.0|            1|          68|           1.0|             1.0|               1.0|
|   11325|    CA_1|      0.98|FOODS_1_097|     0.98|     0.98|      0.0|0.9799999999999994|       1.0|            1|          68|           1.0|             1.0|               1.0|
|   11326|    CA_1|      0.98|FOODS_1_097|     0.98|     0.98|      0.0|0.9799999999999994|       1.0|            1|          68|           1.0|             1.0|               1.0|
|   11327|    CA_1|      0.98|FOODS_1_097|     0.98|     0.98|      0.0|0.9799999999999994|       1.0|            1|          68|           1.0|             1.0|0.9999999999999999|
+--------+--------+----------+-----------+---------+---------+---------+------------------+----------+-------------+------------+--------------+----------------+------------------+
only showing top 5 rows

                                                                                
In [6]:
print(f"Size: {grid_df.count()} rows, {len(grid_df.columns)} columns")
[Stage 72:===================================================>  (119 + 7) / 126]
Size: 853720 rows, 10 columns
                                                                                

Lab 1¶

  • Make a feature matrix for the sales data on Walmart.
  • Visualize the features using Python.
  • Submit your Lab to https://course.pku.edu.cn