Feature Computation using Spark¶
Feng Li¶
Guanghua School of Management¶
Peking University¶
feng.li@gsm.pku.edu.cn¶
Course home page: https://feng.li/bdcf¶
In [1]:
import os, sys # Ensure All environment variables are properly set
# os.environ["JAVA_HOME"] = os.path.dirname(sys.executable)
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable
from pyspark.sql import SparkSession # build Spark Session
spark = SparkSession.builder\
.config("spark.ui.enabled", "false") \
.config("spark.executor.memory", "4g")\
.config("spark.sql.shuffle.partitions", 400)\
.config("spark.driver.memory", "20g")\
.config("spark.executor.cores", 8)\
.appName("Spark DataFrame").getOrCreate() # using spark server
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/03/04 19:13:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
In [2]:
TARGET = 'sales' # Our main target
END_TRAIN = 1913+28 # Last day in train set
MAIN_INDEX = ['id','d'] # We can identify item by these columns
# Load data into PySpark DataFrames
train_df = spark.read.csv("../data/m5-forecasting-accuracy/sales_train_evaluation.csv", header=True, inferSchema=True)
prices_df = spark.read.csv("../data/m5-forecasting-accuracy/sell_prices.csv", header=True, inferSchema=True)
calendar_df = spark.read.csv("../data/m5-forecasting-accuracy/calendar.csv", header=True, inferSchema=True)
# Show the schema (optional)
# train_df.printSchema()
# prices_df.printSchema()
In [3]:
from pyspark.sql.functions import col, lit, expr
from pyspark.sql.types import StringType
import numpy as np
# calendar_df.printSchema()
# Define index columns
index_columns = ['id', 'item_id', 'dept_id', 'cat_id', 'store_id', 'state_id']
# Melt train_df using explode
train_df_long = train_df.selectExpr(
*index_columns,
"stack(" + str(len(train_df.columns) - len(index_columns)) +
"".join([f", '{col}', {col}" for col in train_df.columns if col not in index_columns]) + ") as (d, sales)"
)
# Convert "d" column format to match Pandas melt
train_df_long = train_df_long.withColumn("d", expr("substring(d, 3, length(d)-2)"))
# Count rows
print("Train rows:", train_df.count(), train_df_long.count())
# Create "test set" grid for future dates
from pyspark.sql.functions import monotonically_increasing_id
add_grid = train_df.select(*index_columns).dropDuplicates()
add_grid = add_grid.crossJoin(
spark.createDataFrame([(f"d_{END_TRAIN+i}", np.nan) for i in range(1, 29)], ["d", TARGET])
)
# Combine train and test sets
grid_df = train_df_long.union(add_grid)
# Convert string columns to categorical (in PySpark, it means converting to StringType or factorizing via StringIndexer)
for col_name in index_columns:
grid_df = grid_df.withColumn(col_name, col(col_name).cast(StringType()))
# Show memory usage estimate (PySpark does not have direct memory usage functions)
print(f"Total rows in grid_df: {grid_df.count()}")
25/03/04 19:17:11 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
Train rows: 30490 59181090
[Stage 15:==========> (27 + 104) / 134]
Total rows in grid_df: 60034810
In [4]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
# Group by store_id and item_id to get the earliest (min) wm_yr_wk (release week)
release_df = prices_df.groupBy("store_id", "item_id").agg(F.min("wm_yr_wk").alias("release"))
# Merge release_df with grid_df
grid_df = grid_df.join(release_df, on=["store_id", "item_id"], how="left")
# Remove release_df to free memory
del release_df
# Merge with calendar_df to get wm_yr_wk column
grid_df = grid_df.join(calendar_df.select("wm_yr_wk", "d"), on="d", how="left")
# Remove rows where wm_yr_wk is earlier than release
grid_df = grid_df.filter(F.col("wm_yr_wk") >= F.col("release"))
# Reset index equivalent (not needed in PySpark, but ensuring ordering)
grid_df = grid_df.withColumn("id", F.monotonically_increasing_id())
# Minify the release values
min_release = grid_df.agg(F.min("release")).collect()[0][0] # Get minimum release week
grid_df = grid_df.withColumn("release", (F.col("release") - min_release).cast(IntegerType()))
# Show the transformed grid_df schema and a few rows
grid_df.printSchema()
grid_df.show(5)
root |-- d: string (nullable = true) |-- store_id: string (nullable = true) |-- item_id: string (nullable = true) |-- id: long (nullable = false) |-- dept_id: string (nullable = true) |-- cat_id: string (nullable = true) |-- state_id: string (nullable = true) |-- sales: double (nullable = true) |-- release: integer (nullable = true) |-- wm_yr_wk: integer (nullable = true)
[Stage 44:==================================================> (18 + 2) / 20]
+------+--------+---------------+------------+-----------+---------+--------+-----+-------+--------+ | d|store_id| item_id| id| dept_id| cat_id|state_id|sales|release|wm_yr_wk| +------+--------+---------------+------------+-----------+---------+--------+-----+-------+--------+ |d_1942| CA_1|HOUSEHOLD_1_049|206158430208|HOUSEHOLD_1|HOUSEHOLD| CA| NaN| 304| 11617| |d_1942| CA_2| HOBBIES_1_124|206158430209| HOBBIES_1| HOBBIES| CA| NaN| 150| 11617| |d_1942| CA_2|HOUSEHOLD_2_111|206158430210|HOUSEHOLD_2|HOUSEHOLD| CA| NaN| 0| 11617| |d_1942| CA_2| FOODS_3_113|206158430211| FOODS_3| FOODS| CA| NaN| 0| 11617| |d_1942| CA_2| FOODS_3_092|206158430212| FOODS_3| FOODS| CA| NaN| 118| 11617| +------+--------+---------------+------------+-----------+---------+--------+-----+-------+--------+ only showing top 5 rows
In [6]:
# Save DataFrame as Parquet (preferred for efficiency)
# grid_df.write.mode("overwrite").parquet("grid_part_1.parquet")
grid_df1 = grid_df
In [5]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Define window partitioning with ORDER BY for sequential computations
store_item_window = Window.partitionBy("store_id", "item_id").orderBy("wm_yr_wk")
store_item_month_window = Window.partitionBy("store_id", "item_id", "month").orderBy("wm_yr_wk")
store_item_year_window = Window.partitionBy("store_id", "item_id", "year").orderBy("wm_yr_wk")
# Compute basic aggregations
prices_df = prices_df.withColumn("price_max", F.max("sell_price").over(Window.partitionBy("store_id", "item_id")))
prices_df = prices_df.withColumn("price_min", F.min("sell_price").over(Window.partitionBy("store_id", "item_id")))
prices_df = prices_df.withColumn("price_std", F.stddev("sell_price").over(Window.partitionBy("store_id", "item_id")))
prices_df = prices_df.withColumn("price_mean", F.mean("sell_price").over(Window.partitionBy("store_id", "item_id")))
# Normalize prices (min-max scaling)
prices_df = prices_df.withColumn("price_norm", F.col("sell_price") / F.col("price_max"))
# Compute distinct counts separately (fix for DISTINCT not allowed in window functions)
price_nunique_df = prices_df.groupBy("store_id", "item_id").agg(F.countDistinct("sell_price").alias("price_nunique"))
item_nunique_df = prices_df.groupBy("store_id", "sell_price").agg(F.countDistinct("item_id").alias("item_nunique"))
# Join distinct count results back to prices_df
prices_df = prices_df.join(price_nunique_df, on=["store_id", "item_id"], how="left")
prices_df = prices_df.join(item_nunique_df, on=["store_id", "sell_price"], how="left")
# Fix: Select only necessary columns from calendar_df to avoid ambiguity
calendar_prices = calendar_df.select(
F.col("wm_yr_wk"),
F.col("month").alias("calendar_month"), # Renaming to avoid ambiguity
F.col("year").alias("calendar_year")
).dropDuplicates(["wm_yr_wk"])
# Merge calendar information into prices_df
prices_df = prices_df.join(calendar_prices, on=["wm_yr_wk"], how="left")
# Compute price momentum
prices_df = prices_df.withColumn(
"price_momentum",
F.col("sell_price") / F.lag("sell_price", 1).over(store_item_window)
)
prices_df = prices_df.withColumn(
"price_momentum_m",
F.col("sell_price") / F.mean("sell_price").over(
Window.partitionBy("store_id", "item_id", "calendar_month").orderBy("wm_yr_wk")
)
)
prices_df = prices_df.withColumn(
"price_momentum_y",
F.col("sell_price") / F.mean("sell_price").over(
Window.partitionBy("store_id", "item_id", "calendar_year").orderBy("wm_yr_wk")
)
)
# Drop temporary columns
prices_df = prices_df.drop("calendar_month", "calendar_year")
# Show schema and verify results
prices_df.printSchema()
prices_df.show(5)
root |-- wm_yr_wk: integer (nullable = true) |-- store_id: string (nullable = true) |-- sell_price: double (nullable = true) |-- item_id: string (nullable = true) |-- price_max: double (nullable = true) |-- price_min: double (nullable = true) |-- price_std: double (nullable = true) |-- price_mean: double (nullable = true) |-- price_norm: double (nullable = true) |-- price_nunique: long (nullable = true) |-- item_nunique: long (nullable = true) |-- price_momentum: double (nullable = true) |-- price_momentum_m: double (nullable = true) |-- price_momentum_y: double (nullable = true)
[Stage 62:> (0 + 1) / 1]
+--------+--------+----------+-----------+---------+---------+---------+------------------+----------+-------------+------------+--------------+----------------+------------------+ |wm_yr_wk|store_id|sell_price| item_id|price_max|price_min|price_std| price_mean|price_norm|price_nunique|item_nunique|price_momentum|price_momentum_m| price_momentum_y| +--------+--------+----------+-----------+---------+---------+---------+------------------+----------+-------------+------------+--------------+----------------+------------------+ | 11323| CA_1| 0.98|FOODS_1_097| 0.98| 0.98| 0.0|0.9799999999999994| 1.0| 1| 68| NULL| 1.0| 1.0| | 11324| CA_1| 0.98|FOODS_1_097| 0.98| 0.98| 0.0|0.9799999999999994| 1.0| 1| 68| 1.0| 1.0| 1.0| | 11325| CA_1| 0.98|FOODS_1_097| 0.98| 0.98| 0.0|0.9799999999999994| 1.0| 1| 68| 1.0| 1.0| 1.0| | 11326| CA_1| 0.98|FOODS_1_097| 0.98| 0.98| 0.0|0.9799999999999994| 1.0| 1| 68| 1.0| 1.0| 1.0| | 11327| CA_1| 0.98|FOODS_1_097| 0.98| 0.98| 0.0|0.9799999999999994| 1.0| 1| 68| 1.0| 1.0|0.9999999999999999| +--------+--------+----------+-----------+---------+---------+---------+------------------+----------+-------------+------------+--------------+----------------+------------------+ only showing top 5 rows
In [6]:
print(f"Size: {grid_df.count()} rows, {len(grid_df.columns)} columns")
[Stage 72:===================================================> (119 + 7) / 126]
Size: 853720 rows, 10 columns
Lab 1¶
- Make a feature matrix for the sales data on Walmart.
- Visualize the features using Python.
- Submit your Lab to https://course.pku.edu.cn