{ "cells": [ { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" }, "tags": [] }, "source": [ "# Feature Computation using Spark\n", "\n", "\n", "## Feng Li\n", "\n", "### Guanghua School of Management\n", "### Peking University\n", "\n", "\n", "### [feng.li@gsm.pku.edu.cn](feng.li@gsm.pku.edu.cn)\n", "### Course home page: [https://feng.li/bdcf](https://feng.li/bdcf)" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "slideshow": { "slide_type": "slide" }, "tags": [] }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", "25/03/04 19:13:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" ] } ], "source": [ "import os, sys # Ensure All environment variables are properly set \n", "# os.environ[\"JAVA_HOME\"] = os.path.dirname(sys.executable)\n", "os.environ[\"PYSPARK_PYTHON\"] = sys.executable\n", "os.environ[\"PYSPARK_DRIVER_PYTHON\"] = sys.executable\n", "\n", "from pyspark.sql import SparkSession # build Spark Session\n", "spark = SparkSession.builder\\\n", " .config(\"spark.ui.enabled\", \"false\") \\\n", " .config(\"spark.executor.memory\", \"4g\")\\\n", " .config(\"spark.sql.shuffle.partitions\", 400)\\\n", " .config(\"spark.driver.memory\", \"20g\")\\\n", " .config(\"spark.executor.cores\", 8)\\\n", " .appName(\"Spark DataFrame\").getOrCreate() # using spark server" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "slideshow": { "slide_type": "slide" }, "tags": [] }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " " ] } ], "source": [ "TARGET = 'sales' # Our main target\n", "END_TRAIN = 1913+28 # Last day in train set\n", "MAIN_INDEX = ['id','d'] # We can identify item by these columns\n", "\n", "# Load data into PySpark DataFrames\n", "train_df = spark.read.csv(\"../data/m5-forecasting-accuracy/sales_train_evaluation.csv\", header=True, inferSchema=True)\n", "prices_df = spark.read.csv(\"../data/m5-forecasting-accuracy/sell_prices.csv\", header=True, inferSchema=True)\n", "calendar_df = spark.read.csv(\"../data/m5-forecasting-accuracy/calendar.csv\", header=True, inferSchema=True)\n", "\n", "# Show the schema (optional)\n", "# train_df.printSchema()\n", "# prices_df.printSchema()\n" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "slideshow": { "slide_type": "slide" }, "tags": [] }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "25/03/04 19:17:11 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.\n", " " ] }, { "name": "stdout", "output_type": "stream", "text": [ "Train rows: 30490 59181090\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "[Stage 15:==========> (27 + 104) / 134]" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Total rows in grid_df: 60034810\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " " ] } ], "source": [ "from pyspark.sql.functions import col, lit, expr\n", "from pyspark.sql.types import StringType\n", "import numpy as np\n", "# calendar_df.printSchema()\n", "\n", "# Define index columns\n", "index_columns = ['id', 'item_id', 'dept_id', 'cat_id', 'store_id', 'state_id']\n", "\n", "# Melt train_df using explode\n", "train_df_long = train_df.selectExpr(\n", " *index_columns, \n", " \"stack(\" + str(len(train_df.columns) - len(index_columns)) + \n", " \"\".join([f\", '{col}', {col}\" for col in train_df.columns if col not in index_columns]) + \") as (d, sales)\"\n", ")\n", "\n", "# Convert \"d\" column format to match Pandas melt\n", "train_df_long = train_df_long.withColumn(\"d\", expr(\"substring(d, 3, length(d)-2)\"))\n", "\n", "# Count rows\n", "print(\"Train rows:\", train_df.count(), train_df_long.count())\n", "\n", "# Create \"test set\" grid for future dates\n", "from pyspark.sql.functions import monotonically_increasing_id\n", "\n", "add_grid = train_df.select(*index_columns).dropDuplicates()\n", "add_grid = add_grid.crossJoin(\n", " spark.createDataFrame([(f\"d_{END_TRAIN+i}\", np.nan) for i in range(1, 29)], [\"d\", TARGET])\n", ")\n", "\n", "# Combine train and test sets\n", "grid_df = train_df_long.union(add_grid)\n", "\n", "# Convert string columns to categorical (in PySpark, it means converting to StringType or factorizing via StringIndexer)\n", "for col_name in index_columns:\n", " grid_df = grid_df.withColumn(col_name, col(col_name).cast(StringType()))\n", "\n", "# Show memory usage estimate (PySpark does not have direct memory usage functions)\n", "print(f\"Total rows in grid_df: {grid_df.count()}\")\n" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "slideshow": { "slide_type": "slide" }, "tags": [] }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " " ] }, { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- d: string (nullable = true)\n", " |-- store_id: string (nullable = true)\n", " |-- item_id: string (nullable = true)\n", " |-- id: long (nullable = false)\n", " |-- dept_id: string (nullable = true)\n", " |-- cat_id: string (nullable = true)\n", " |-- state_id: string (nullable = true)\n", " |-- sales: double (nullable = true)\n", " |-- release: integer (nullable = true)\n", " |-- wm_yr_wk: integer (nullable = true)\n", "\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "[Stage 44:==================================================> (18 + 2) / 20]" ] }, { "name": "stdout", "output_type": "stream", "text": [ "+------+--------+---------------+------------+-----------+---------+--------+-----+-------+--------+\n", "| d|store_id| item_id| id| dept_id| cat_id|state_id|sales|release|wm_yr_wk|\n", "+------+--------+---------------+------------+-----------+---------+--------+-----+-------+--------+\n", "|d_1942| CA_1|HOUSEHOLD_1_049|206158430208|HOUSEHOLD_1|HOUSEHOLD| CA| NaN| 304| 11617|\n", "|d_1942| CA_2| HOBBIES_1_124|206158430209| HOBBIES_1| HOBBIES| CA| NaN| 150| 11617|\n", "|d_1942| CA_2|HOUSEHOLD_2_111|206158430210|HOUSEHOLD_2|HOUSEHOLD| CA| NaN| 0| 11617|\n", "|d_1942| CA_2| FOODS_3_113|206158430211| FOODS_3| FOODS| CA| NaN| 0| 11617|\n", "|d_1942| CA_2| FOODS_3_092|206158430212| FOODS_3| FOODS| CA| NaN| 118| 11617|\n", "+------+--------+---------------+------------+-----------+---------+--------+-----+-------+--------+\n", "only showing top 5 rows\n", "\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " " ] } ], "source": [ "from pyspark.sql import functions as F\n", "from pyspark.sql.types import IntegerType\n", "\n", "# Group by store_id and item_id to get the earliest (min) wm_yr_wk (release week)\n", "release_df = prices_df.groupBy(\"store_id\", \"item_id\").agg(F.min(\"wm_yr_wk\").alias(\"release\"))\n", "\n", "# Merge release_df with grid_df\n", "grid_df = grid_df.join(release_df, on=[\"store_id\", \"item_id\"], how=\"left\")\n", "\n", "# Remove release_df to free memory\n", "del release_df\n", "\n", "# Merge with calendar_df to get wm_yr_wk column\n", "grid_df = grid_df.join(calendar_df.select(\"wm_yr_wk\", \"d\"), on=\"d\", how=\"left\")\n", "\n", "# Remove rows where wm_yr_wk is earlier than release\n", "grid_df = grid_df.filter(F.col(\"wm_yr_wk\") >= F.col(\"release\"))\n", "\n", "# Reset index equivalent (not needed in PySpark, but ensuring ordering)\n", "grid_df = grid_df.withColumn(\"id\", F.monotonically_increasing_id())\n", "\n", "# Minify the release values\n", "min_release = grid_df.agg(F.min(\"release\")).collect()[0][0] # Get minimum release week\n", "grid_df = grid_df.withColumn(\"release\", (F.col(\"release\") - min_release).cast(IntegerType()))\n", "\n", "# Show the transformed grid_df schema and a few rows\n", "grid_df.printSchema()\n", "grid_df.show(5)\n" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "slideshow": { "slide_type": "slide" }, "tags": [] }, "outputs": [], "source": [ "# Save DataFrame as Parquet (preferred for efficiency)\n", "# grid_df.write.mode(\"overwrite\").parquet(\"grid_part_1.parquet\")\n", "grid_df1 = grid_df" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "slideshow": { "slide_type": "slide" }, "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- wm_yr_wk: integer (nullable = true)\n", " |-- store_id: string (nullable = true)\n", " |-- sell_price: double (nullable = true)\n", " |-- item_id: string (nullable = true)\n", " |-- price_max: double (nullable = true)\n", " |-- price_min: double (nullable = true)\n", " |-- price_std: double (nullable = true)\n", " |-- price_mean: double (nullable = true)\n", " |-- price_norm: double (nullable = true)\n", " |-- price_nunique: long (nullable = true)\n", " |-- item_nunique: long (nullable = true)\n", " |-- price_momentum: double (nullable = true)\n", " |-- price_momentum_m: double (nullable = true)\n", " |-- price_momentum_y: double (nullable = true)\n", "\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "[Stage 62:> (0 + 1) / 1]" ] }, { "name": "stdout", "output_type": "stream", "text": [ "+--------+--------+----------+-----------+---------+---------+---------+------------------+----------+-------------+------------+--------------+----------------+------------------+\n", "|wm_yr_wk|store_id|sell_price| item_id|price_max|price_min|price_std| price_mean|price_norm|price_nunique|item_nunique|price_momentum|price_momentum_m| price_momentum_y|\n", "+--------+--------+----------+-----------+---------+---------+---------+------------------+----------+-------------+------------+--------------+----------------+------------------+\n", "| 11323| CA_1| 0.98|FOODS_1_097| 0.98| 0.98| 0.0|0.9799999999999994| 1.0| 1| 68| NULL| 1.0| 1.0|\n", "| 11324| CA_1| 0.98|FOODS_1_097| 0.98| 0.98| 0.0|0.9799999999999994| 1.0| 1| 68| 1.0| 1.0| 1.0|\n", "| 11325| CA_1| 0.98|FOODS_1_097| 0.98| 0.98| 0.0|0.9799999999999994| 1.0| 1| 68| 1.0| 1.0| 1.0|\n", "| 11326| CA_1| 0.98|FOODS_1_097| 0.98| 0.98| 0.0|0.9799999999999994| 1.0| 1| 68| 1.0| 1.0| 1.0|\n", "| 11327| CA_1| 0.98|FOODS_1_097| 0.98| 0.98| 0.0|0.9799999999999994| 1.0| 1| 68| 1.0| 1.0|0.9999999999999999|\n", "+--------+--------+----------+-----------+---------+---------+---------+------------------+----------+-------------+------------+--------------+----------------+------------------+\n", "only showing top 5 rows\n", "\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " " ] } ], "source": [ "from pyspark.sql import functions as F\n", "from pyspark.sql.window import Window\n", "\n", "# Define window partitioning with ORDER BY for sequential computations\n", "store_item_window = Window.partitionBy(\"store_id\", \"item_id\").orderBy(\"wm_yr_wk\")\n", "store_item_month_window = Window.partitionBy(\"store_id\", \"item_id\", \"month\").orderBy(\"wm_yr_wk\")\n", "store_item_year_window = Window.partitionBy(\"store_id\", \"item_id\", \"year\").orderBy(\"wm_yr_wk\")\n", "\n", "# Compute basic aggregations\n", "prices_df = prices_df.withColumn(\"price_max\", F.max(\"sell_price\").over(Window.partitionBy(\"store_id\", \"item_id\")))\n", "prices_df = prices_df.withColumn(\"price_min\", F.min(\"sell_price\").over(Window.partitionBy(\"store_id\", \"item_id\")))\n", "prices_df = prices_df.withColumn(\"price_std\", F.stddev(\"sell_price\").over(Window.partitionBy(\"store_id\", \"item_id\")))\n", "prices_df = prices_df.withColumn(\"price_mean\", F.mean(\"sell_price\").over(Window.partitionBy(\"store_id\", \"item_id\")))\n", "\n", "# Normalize prices (min-max scaling)\n", "prices_df = prices_df.withColumn(\"price_norm\", F.col(\"sell_price\") / F.col(\"price_max\"))\n", "\n", "# Compute distinct counts separately (fix for DISTINCT not allowed in window functions)\n", "price_nunique_df = prices_df.groupBy(\"store_id\", \"item_id\").agg(F.countDistinct(\"sell_price\").alias(\"price_nunique\"))\n", "item_nunique_df = prices_df.groupBy(\"store_id\", \"sell_price\").agg(F.countDistinct(\"item_id\").alias(\"item_nunique\"))\n", "\n", "# Join distinct count results back to prices_df\n", "prices_df = prices_df.join(price_nunique_df, on=[\"store_id\", \"item_id\"], how=\"left\")\n", "prices_df = prices_df.join(item_nunique_df, on=[\"store_id\", \"sell_price\"], how=\"left\")\n", "\n", "# Fix: Select only necessary columns from calendar_df to avoid ambiguity\n", "calendar_prices = calendar_df.select(\n", " F.col(\"wm_yr_wk\"),\n", " F.col(\"month\").alias(\"calendar_month\"), # Renaming to avoid ambiguity\n", " F.col(\"year\").alias(\"calendar_year\")\n", ").dropDuplicates([\"wm_yr_wk\"])\n", "\n", "# Merge calendar information into prices_df\n", "prices_df = prices_df.join(calendar_prices, on=[\"wm_yr_wk\"], how=\"left\")\n", "\n", "# Compute price momentum\n", "prices_df = prices_df.withColumn(\n", " \"price_momentum\",\n", " F.col(\"sell_price\") / F.lag(\"sell_price\", 1).over(store_item_window)\n", ")\n", "prices_df = prices_df.withColumn(\n", " \"price_momentum_m\",\n", " F.col(\"sell_price\") / F.mean(\"sell_price\").over(\n", " Window.partitionBy(\"store_id\", \"item_id\", \"calendar_month\").orderBy(\"wm_yr_wk\")\n", " )\n", ")\n", "prices_df = prices_df.withColumn(\n", " \"price_momentum_y\",\n", " F.col(\"sell_price\") / F.mean(\"sell_price\").over(\n", " Window.partitionBy(\"store_id\", \"item_id\", \"calendar_year\").orderBy(\"wm_yr_wk\")\n", " )\n", ")\n", "\n", "# Drop temporary columns\n", "prices_df = prices_df.drop(\"calendar_month\", \"calendar_year\")\n", "\n", "# Show schema and verify results\n", "prices_df.printSchema()\n", "prices_df.show(5)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "slideshow": { "slide_type": "slide" }, "tags": [] }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[Stage 72:===================================================> (119 + 7) / 126]" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Size: 853720 rows, 10 columns\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " " ] } ], "source": [ "print(f\"Size: {grid_df.count()} rows, {len(grid_df.columns)} columns\")\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" }, "tags": [] }, "source": [ "## Lab 1\n", "\n", "- Make a feature matrix for the sales data on Walmart.\n", "- Visualize the features using Python.\n", "- Submit your Lab to https://course.pku.edu.cn " ] } ], "metadata": { "kernelspec": { "display_name": "python3.12", "language": "python", "name": "python3.12" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.9" } }, "nbformat": 4, "nbformat_minor": 5 }