{ "cells": [ { "cell_type": "markdown", "metadata": { "editable": true, "slideshow": { "slide_type": "slide" }, "tags": [] }, "source": [ "# Weighted Reconciliation with Spark\n", "\n", "## Feng Li\n", "\n", "### Guanghua School of Management\n", "### Peking University\n", "\n", "\n", "### [feng.li@gsm.pku.edu.cn](feng.li@gsm.pku.edu.cn)\n", "### Course home page: [https://feng.li/bdcf](https://feng.li/bdcf)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## MinT-WLS\n", "\n", "\n", "- MinT-WLS assumes a **diagonal forecast error covariance matrix** $ W $, where each diagonal element is the **variance of forecast errors** for each series (e.g., Region_Category). The formula becomes:\n", "\n", "$$\n", "\\tilde{y} = S (S^\\top W^{-1} S)^{-1} S^\\top W^{-1} \\hat{y}\n", "$$\n", "\n", "- $ \\hat{y} $: base forecasts (from ETS, etc.)\n", "- $ S $: summing matrix\n", "- $ W $: diagonal matrix of **forecast error variances** from the training residuals\n" ] }, { "cell_type": "markdown", "metadata": { "editable": true, "slideshow": { "slide_type": "slide" }, "tags": [] }, "source": [ "## How to Approximate MinT-WLS in Spark?\n", "\n", "Since Spark is not optimized for full matrix ops, you can:\n", "\n", "- Estimate **error variance per Region_Category** using training residuals.\n", "- To compute forecast error variances for use in MinT-WLS, you need **in-sample forecasts** (i.e., forecasts over the training period, not just for the future 12 months). These are often called fitted values from the model.\n", "- Use the **inverse variance** as weights.\n", "- Perform a **weighted projection** manually (just like MinT-OLS, but with weights)." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", "25/03/26 21:51:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" ] }, { "data": { "text/html": [ "\n", "
\n", "

SparkSession - in-memory

\n", " \n", "
\n", "

SparkContext

\n", "\n", "

Spark UI

\n", "\n", "
\n", "
Version
\n", "
v3.5.3
\n", "
Master
\n", "
local[*]
\n", "
AppName
\n", "
Spark Forecasting
\n", "
\n", "
\n", " \n", "
\n", " " ], "text/plain": [ "" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import os, sys # Ensure All environment variables are properly set \n", "# os.environ[\"JAVA_HOME\"] = os.path.dirname(sys.executable)\n", "os.environ[\"PYSPARK_PYTHON\"] = sys.executable\n", "os.environ[\"PYSPARK_DRIVER_PYTHON\"] = sys.executable\n", "\n", "from pyspark.sql import SparkSession # build Spark Session\n", "spark = SparkSession.builder \\\n", " .config(\"spark.ui.enabled\", \"false\") \\\n", " .config(\"spark.executor.memory\", \"16g\") \\\n", " .config(\"spark.executor.cores\", \"4\") \\\n", " .config(\"spark.cores.max\", \"32\") \\\n", " .config(\"spark.driver.memory\", \"30g\") \\\n", " .config(\"spark.sql.shuffle.partitions\", \"96\") \\\n", " .config(\"spark.memory.fraction\", \"0.8\") \\\n", " .config(\"spark.memory.storageFraction\", \"0.5\") \\\n", " .config(\"spark.dynamicAllocation.enabled\", \"true\") \\\n", " .config(\"spark.dynamicAllocation.minExecutors\", \"4\") \\\n", " .config(\"spark.dynamicAllocation.maxExecutors\", \"8\") \\\n", " .appName(\"Spark Forecasting\").getOrCreate()\n", "spark" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "train_sdf = spark.read.csv(\"../data/tourism/tourism_train.csv\", header=True, inferSchema=True)\n", "test_sdf = spark.read.csv(\"../data/tourism/tourism_test.csv\", header=True, inferSchema=True)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Spark approach\n", "\n", "- You can modify the `ets_forecast` function to return both the **fitted values** (in-sample) and **forecast values** (out-of-sample) in a single column, while tagging them with a type column (\"fitted\" or \"forecast\"). \n", "- Later, you can easily split or filter." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "# Modified ets_forecast Function: Fitted + Forecast in One UDF\n", "from pyspark.sql.functions import pandas_udf\n", "from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType\n", "\n", "from pandas.tseries.offsets import MonthBegin\n", "from statsmodels.tsa.holtwinters import ExponentialSmoothing\n", "import pandas as pd\n", "\n", "# Schema with an extra 'type' column to label fitted vs forecast\n", "schema = StructType([\n", " StructField(\"date\", DateType(), False),\n", " StructField(\"Region_Category\", StringType(), False),\n", " StructField(\"Forecast\", DoubleType(), False),\n", " StructField(\"type\", StringType(), False)\n", "])\n", "\n", "def ets_fitted_and_forecast(pdf):\n", " region = pdf[\"Region_Category\"].iloc[0]\n", " pdf = pdf.sort_values(\"date\")\n", "\n", " try:\n", " ts = pdf[\"Visitors\"].dropna()\n", " dates = pdf[\"date\"]\n", "\n", " if len(ts) >= 24:\n", " model = ExponentialSmoothing(ts, trend=\"add\", seasonal=\"add\", seasonal_periods=12)\n", " fitted_model = model.fit()\n", "\n", " # Fitted values (same length as training data)\n", " fitted_values = fitted_model.fittedvalues\n", " fitted_df = pd.DataFrame({\n", " \"date\": dates[-len(fitted_values):].values,\n", " \"Region_Category\": region,\n", " \"Forecast\": fitted_values.values,\n", " \"type\": \"fitted\"\n", " })\n", "\n", " # Forecast for next 12 months\n", " forecast_values = fitted_model.forecast(steps=12)\n", " last_date = pdf[\"date\"].max()\n", " forecast_dates = pd.date_range(start=last_date, periods=12, freq=\"ME\") + MonthBegin(1)\n", "\n", " forecast_df = pd.DataFrame({\n", " \"date\": forecast_dates,\n", " \"Region_Category\": region,\n", " \"Forecast\": forecast_values.values,\n", " \"type\": \"forecast\"\n", " })\n", "\n", " result = pd.concat([fitted_df, forecast_df], ignore_index=True)\n", "\n", " else:\n", " result = pd.DataFrame({\n", " \"date\": pdf[\"date\"],\n", " \"Region_Category\": region,\n", " \"Forecast\": [None] * len(pdf),\n", " \"type\": \"fitted\"\n", " })\n", "\n", " except:\n", " result = pd.DataFrame({\n", " \"date\": pdf[\"date\"],\n", " \"Region_Category\": region,\n", " \"Forecast\": [None] * len(pdf),\n", " \"type\": \"fitted\"\n", " })\n", "\n", " return result" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[Stage 6:> (0 + 1) / 1]\r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "+----------+---------------+------------------+------+\n", "| date|Region_Category| Forecast| type|\n", "+----------+---------------+------------------+------+\n", "|1998-01-01| AAAAll|3163.2697233948543|fitted|\n", "|1998-02-01| AAAAll|1778.6336508067377|fitted|\n", "|1998-03-01| AAAAll|1964.5793105987796|fitted|\n", "|1998-04-01| AAAAll| 2368.289474343738|fitted|\n", "|1998-05-01| AAAAll|1939.6931272936972|fitted|\n", "|1998-06-01| AAAAll| 1873.297280626614|fitted|\n", "|1998-07-01| AAAAll| 2276.910878509122|fitted|\n", "|1998-08-01| AAAAll|2030.1825468670795|fitted|\n", "|1998-09-01| AAAAll|2249.6390692360214|fitted|\n", "|1998-10-01| AAAAll| 2468.892447267113|fitted|\n", "|1998-11-01| AAAAll|2137.1171938803254|fitted|\n", "|1998-12-01| AAAAll| 2156.903889882829|fitted|\n", "|1999-01-01| AAAAll|3128.9051350700565|fitted|\n", "|1999-02-01| AAAAll|1565.6742174446586|fitted|\n", "|1999-03-01| AAAAll|1739.9329015784124|fitted|\n", "|1999-04-01| AAAAll| 2179.73856154665|fitted|\n", "|1999-05-01| AAAAll| 1865.842031962462|fitted|\n", "|1999-06-01| AAAAll|1701.8465731355816|fitted|\n", "|1999-07-01| AAAAll| 2104.648972200794|fitted|\n", "|1999-08-01| AAAAll|1820.0248238648078|fitted|\n", "+----------+---------------+------------------+------+\n", "only showing top 20 rows\n", "\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\r", " \r" ] } ], "source": [ "forecast_all_sdf = train_sdf.groupBy(\"Region_Category\").applyInPandas(\n", " ets_fitted_and_forecast,\n", " schema=schema\n", ")\n", "\n", "forecast_all_sdf.show()" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", " \r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "+----------+---------------+------------------+------+\n", "| date|Region_Category| Forecast| type|\n", "+----------+---------------+------------------+------+\n", "|1998-01-01| AAAAll|3163.2697233948543|fitted|\n", "|1998-02-01| AAAAll|1778.6336508067377|fitted|\n", "|1998-03-01| AAAAll|1964.5793105987796|fitted|\n", "|1998-04-01| AAAAll| 2368.289474343738|fitted|\n", "|1998-05-01| AAAAll|1939.6931272936972|fitted|\n", "|1998-06-01| AAAAll| 1873.297280626614|fitted|\n", "|1998-07-01| AAAAll| 2276.910878509122|fitted|\n", "|1998-08-01| AAAAll|2030.1825468670795|fitted|\n", "|1998-09-01| AAAAll|2249.6390692360214|fitted|\n", "|1998-10-01| AAAAll| 2468.892447267113|fitted|\n", "|1998-11-01| AAAAll|2137.1171938803254|fitted|\n", "|1998-12-01| AAAAll| 2156.903889882829|fitted|\n", "|1999-01-01| AAAAll|3128.9051350700565|fitted|\n", "|1999-02-01| AAAAll|1565.6742174446586|fitted|\n", "|1999-03-01| AAAAll|1739.9329015784124|fitted|\n", "|1999-04-01| AAAAll| 2179.73856154665|fitted|\n", "|1999-05-01| AAAAll| 1865.842031962462|fitted|\n", "|1999-06-01| AAAAll|1701.8465731355816|fitted|\n", "|1999-07-01| AAAAll| 2104.648972200794|fitted|\n", "|1999-08-01| AAAAll|1820.0248238648078|fitted|\n", "+----------+---------------+------------------+------+\n", "only showing top 20 rows\n", "\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "[Stage 12:> (0 + 1) / 1]\r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "+----------+---------------+------------------+--------+\n", "| date|Region_Category| Forecast| type|\n", "+----------+---------------+------------------+--------+\n", "|2016-01-01| AAAAll| 3102.656423754467|forecast|\n", "|2016-02-01| AAAAll|1680.2657901376683|forecast|\n", "|2016-03-01| AAAAll|1990.2751366602693|forecast|\n", "|2016-04-01| AAAAll|1998.0264213694707|forecast|\n", "|2016-05-01| AAAAll|1901.6160001070255|forecast|\n", "|2016-06-01| AAAAll| 1774.600837587313|forecast|\n", "|2016-07-01| AAAAll| 2080.923016745227|forecast|\n", "|2016-08-01| AAAAll|1801.0122793015398|forecast|\n", "|2016-09-01| AAAAll|1943.9770740225126|forecast|\n", "|2016-10-01| AAAAll|2227.8982222051372|forecast|\n", "|2016-11-01| AAAAll|2002.3244224259395|forecast|\n", "|2016-12-01| AAAAll| 2034.390487099784|forecast|\n", "|2016-01-01| AAABus| 297.5805840966566|forecast|\n", "|2016-02-01| AAABus|458.72259824713456|forecast|\n", "|2016-03-01| AAABus| 528.3710688119537|forecast|\n", "|2016-04-01| AAABus| 463.137319957957|forecast|\n", "|2016-05-01| AAABus| 558.7822468756337|forecast|\n", "|2016-06-01| AAABus|499.18248190461196|forecast|\n", "|2016-07-01| AAABus| 608.6718553424128|forecast|\n", "|2016-08-01| AAABus| 582.5088141373288|forecast|\n", "+----------+---------------+------------------+--------+\n", "only showing top 20 rows\n", "\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\r", " \r" ] } ], "source": [ "from pyspark.sql.functions import explode, col\n", "\n", "train_forecast_sdf = forecast_all_sdf.filter(col(\"type\") == \"fitted\")\n", "forecast_sdf = forecast_all_sdf.filter(col(\"type\") == \"forecast\")\n", "\n", "train_forecast_sdf.show()\n", "forecast_sdf.show()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "from pyspark.sql.functions import col, pow, avg\n", "\n", "# Extract In-Sample Residuals and Estimate Variance\n", "\n", "# Step 1: Get fitted values only\n", "fitted_sdf = forecast_all_sdf.filter(col(\"type\") == \"fitted\")\n", "\n", "# Step 2: Join with actual visitors from train_sdf to compute residuals\n", "residuals_sdf = fitted_sdf.join(train_sdf, on=[\"date\", \"Region_Category\"], how=\"inner\") \\\n", " .withColumn(\"squared_error\", pow(col(\"Forecast\") - col(\"Visitors\"), 2))\n", "\n", "# Step 3: Compute variance per Region_Category (mean squared error)\n", "error_variance_sdf = residuals_sdf.groupBy(\"Region_Category\").agg(\n", " avg(\"squared_error\").alias(\"Forecast_Variance\")\n", ")\n" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "+---------------+------------------+\n", "|Region_Category| Forecast_Variance|\n", "+---------------+------------------+\n", "| BCBOth|272.85436597042093|\n", "| BCHol| 9105.598569708574|\n", "| BDEAll| 775.5776169835501|\n", "| CBDAll|14331.084040952792|\n", "| CCBAll|18003.738283002403|\n", "| CCOth| 7712.312952780992|\n", "| DCCAll| 437.5673214249484|\n", "| DDBHol|1275.5532108535858|\n", "| EABVis|12122.219354684257|\n", "| FBAVis|488.41810972817876|\n", "| ADBAll| 4273.105859782842|\n", "| BDFAll| 659.4183940165184|\n", "| CBCHol| 5490.905212321565|\n", "| FAAHol| 5213.459407634496|\n", "| GABVis| 164.2504662157065|\n", "| GBCAll|3205.0012333762947|\n", "| AEHol| 10451.84784359348|\n", "| BDBAll| 2612.127028640519|\n", "| BDCBus| 705.4305537828409|\n", "| BEGAll|1563.7787233669223|\n", "+---------------+------------------+\n", "only showing top 20 rows\n", "\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\r", " \r" ] } ], "source": [ "error_variance_sdf.show()" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "scrolled": true, "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", " \r" ] } ], "source": [ "from pyspark.ml.feature import VectorAssembler\n", "from pyspark.ml.regression import LinearRegression\n", "from pyspark.sql.functions import row_number, lit, col, first\n", "from pyspark.sql.window import Window\n", "\n", "# STEP 1: Prepare wide-format forecast matrix\n", "window = Window.partitionBy(\"Region_Category\").orderBy(\"date\")\n", "forecast_sdf = forecast_all_sdf.filter(col(\"type\") == \"forecast\")\n", "forecast_sdf = forecast_sdf.withColumn(\"step\", row_number().over(window))\n", "\n", "forecast_wide_sdf = forecast_sdf.groupBy(\"Region_Category\").pivot(\"step\").agg(first(\"Forecast\"))\n" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "25/03/26 21:53:20 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "+------------+---------------+------+\n", "|Parent_Group|Region_Category|Weight|\n", "+------------+---------------+------+\n", "| TotalAll| AAAHol| 1.0|\n", "| TotalAll| AAAVis| 1.0|\n", "| TotalAll| AAABus| 1.0|\n", "| TotalAll| AAAOth| 1.0|\n", "| TotalAll| AABHol| 1.0|\n", "| TotalAll| AABVis| 1.0|\n", "| TotalAll| AABBus| 1.0|\n", "| TotalAll| AABOth| 1.0|\n", "| TotalAll| ABAHol| 1.0|\n", "| TotalAll| ABAVis| 1.0|\n", "| TotalAll| ABABus| 1.0|\n", "| TotalAll| ABAOth| 1.0|\n", "| TotalAll| ABBHol| 1.0|\n", "| TotalAll| ABBVis| 1.0|\n", "| TotalAll| ABBBus| 1.0|\n", "| TotalAll| ABBOth| 1.0|\n", "| TotalAll| ACAHol| 1.0|\n", "| TotalAll| ACAVis| 1.0|\n", "| TotalAll| ACABus| 1.0|\n", "| TotalAll| ACAOth| 1.0|\n", "+------------+---------------+------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "from pyspark.sql.functions import col, sum as spark_sum\n", "\n", "# Load the summing matrix file\n", "summing_matrix_path = \"../data/tourism/agg_mat.csv\" # Update with actual path\n", "\n", "# Load the summing matrix file (skip the first column)\n", "summing_sdf = spark.read.csv(summing_matrix_path, header=True, inferSchema=True)\n", "\n", "# Convert from wide format to long format (Region_Category, Parent_Group, Weight)\n", "summing_sdf_long = summing_sdf.selectExpr(\n", " \"Parent_Group\",\n", " \"stack(\" + str(len(summing_sdf.columns) - 1) + \", \" +\n", " \", \".join([f\"'{col}', {col}\" for col in summing_sdf.columns if col != \"Parent_Group\"]) +\n", " \") as (Region_Category, Weight)\"\n", ")\n", "\n", "# Show the reshaped summing matrix\n", "summing_sdf_long.show()\n", "\n", "# STEP 2: Transpose S matrix\n", "s_matrix_T = summing_sdf_long.groupBy(\"Region_Category\").pivot(\"Parent_Group\").agg(first(\"Weight\")).fillna(0)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "25/03/26 21:55:42 WARN Instrumentation: [97d3b04d] regParam is zero, which might cause numerical instability and overfitting.\n", "25/03/26 21:55:44 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS\n", "25/03/26 21:55:44 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK\n", "25/03/26 21:55:44 WARN Instrumentation: [97d3b04d] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.\n", "25/03/26 21:55:45 ERROR LBFGS: Failure! Resetting history: breeze.optimize.FirstOrderException: Line search zoom failed\n", "25/03/26 21:55:45 ERROR LBFGS: Failure again! Giving up and returning. Maybe the objective is just poorly behaved?\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n", "/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.\n", " warnings.warn(\n" ] } ], "source": [ "# STEP 3: Join with variance and fit one regression per step using weights\n", "from functools import reduce\n", "\n", "reconciled_dfs = []\n", "for step in range(1, 13):\n", " step_col = str(step)\n", "\n", " # Join forecasts with design matrix and forecast variances\n", " df = forecast_wide_sdf.select(\"Region_Category\", step_col).join(\n", " s_matrix_T, on=\"Region_Category\", how=\"inner\"\n", " ).join(\n", " error_variance_sdf.withColumn(\"weight\", 1 / col(\"Forecast_Variance\")),\n", " on=\"Region_Category\",\n", " how=\"inner\"\n", " )\n", "\n", " # Assemble features\n", " feature_cols = [c for c in df.columns if c not in [\"Region_Category\", step_col, \"weight\"]]\n", " assembler = VectorAssembler(inputCols=feature_cols, outputCol=\"features\")\n", " assembled_df = assembler.transform(df).select(\"Region_Category\", \"features\", col(step_col).alias(\"label\"), \"weight\")\n", "\n", " # WLS Regression\n", " lr = LinearRegression(featuresCol=\"features\", labelCol=\"label\", weightCol=\"weight\")\n", " model = lr.fit(assembled_df)\n", "\n", " # Predict reconciled values\n", " pred_df = model.transform(assembled_df).select(\"Region_Category\", col(\"prediction\").alias(\"Forecast\")) \\\n", " .withColumn(\"step\", lit(step))\n", "\n", " reconciled_dfs.append(pred_df)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "from functools import reduce\n", "from pyspark.sql.functions import expr\n", "\n", "reconciled_long_df = reduce(lambda df1, df2: df1.unionByName(df2), reconciled_dfs)\n", "test_start_date = test_sdf.selectExpr(\"min(date)\").collect()[0][0]\n", "\n", "reconciled_long_df = reconciled_long_df.withColumn(\n", " \"date\", expr(f\"add_months(to_date('{test_start_date}'), step - 1)\")\n", ")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "reconciled_with_hierarchy_df = reconciled_long_df.join(\n", " summing_sdf_long, on=\"Region_Category\", how=\"inner\"\n", ")\n", "\n", "reconciled_agg_df = reconciled_with_hierarchy_df.withColumn(\n", " \"Weighted_Forecast\", col(\"Forecast\") * col(\"Weight\")\n", ").groupBy(\"Parent_Group\", \"date\").agg(\n", " spark_sum(\"Weighted_Forecast\").alias(\"Reconciled_Forecast\")\n", ")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "test_with_hierarchy_df = test_sdf.join(summing_sdf_long, on=\"Region_Category\", how=\"inner\")\n", "\n", "test_agg_df = test_with_hierarchy_df.withColumn(\n", " \"Weighted_Actual\", col(\"Visitors\") * col(\"Weight\")\n", ").groupBy(\"Parent_Group\", \"date\").agg(\n", " spark_sum(\"Weighted_Actual\").alias(\"Actual_Visitors\")\n", ")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "evaluation_df = reconciled_agg_df.join(test_agg_df, on=[\"Parent_Group\", \"date\"], how=\"inner\")\n", "\n", "evaluation_df = evaluation_df.withColumn(\n", " \"APE\", spark_abs((col(\"Reconciled_Forecast\") - col(\"Actual_Visitors\")) / col(\"Actual_Visitors\"))\n", ")\n", "\n", "mape_df = evaluation_df.groupBy(\"Parent_Group\").agg(avg(\"APE\").alias(\"MAPE\"))\n", "overall_mape_df = mape_df.agg(avg(\"MAPE\").alias(\"Overall_MAPE\"))\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "mape_df.show()\n", "overall_mape_df()" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Summary\n", "\n", "\n", "- **MinT-WLS** in Spark is approximate — no matrix inversion is used. \n", "\n", "- For full matrix-based MinT with off-diagonal covariance, it requires distributed matrix inversion techniques " ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Matrix inversion in Spark\n", "\n", "- Apache Spark’s MLlib library includes some linear algebra tools under pyspark.ml.linalg. You can create matrices, multiply them, and do some decompositions.\n", "\n", "- But: no direct `.inverse()` method is exposed in PySpark.\n", "\n", "\n", "- But in **Scala**, a **non-distributed version** `Matrices.dense(...).inverse()` exists via Breeze module.\n", "\n", "- Distributed inversion is a very difficult problem." ] } ], "metadata": { "celltoolbar": "Slideshow", "kernelspec": { "display_name": "/home/fli/.virtualenvs/python3.12", "language": "python", "name": "python3.12" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.3" } }, "nbformat": 4, "nbformat_minor": 4 }