Weighted Reconciliation with Spark¶

Feng Li¶

Guanghua School of Management¶

Peking University¶

feng.li@gsm.pku.edu.cn¶

Course home page: https://feng.li/bdcf¶

MinT-WLS¶

  • MinT-WLS assumes a diagonal forecast error covariance matrix $ W $, where each diagonal element is the variance of forecast errors for each series (e.g., Region_Category). The formula becomes:

$$ \tilde{y} = S (S^\top W^{-1} S)^{-1} S^\top W^{-1} \hat{y} $$

  • $ \hat{y} $: base forecasts (from ETS, etc.)
  • $ S $: summing matrix
  • $ W $: diagonal matrix of forecast error variances from the training residuals

How to Approximate MinT-WLS in Spark?¶

Since Spark is not optimized for full matrix ops, you can:

  • Estimate error variance per Region_Category using training residuals.
  • To compute forecast error variances for use in MinT-WLS, you need in-sample forecasts (i.e., forecasts over the training period, not just for the future 12 months). These are often called fitted values from the model.
  • Use the inverse variance as weights.
  • Perform a weighted projection manually (just like MinT-OLS, but with weights).
In [1]:
import os, sys # Ensure All environment variables are properly set 
# os.environ["JAVA_HOME"] = os.path.dirname(sys.executable)
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

from pyspark.sql import SparkSession # build Spark Session
spark = SparkSession.builder \
    .config("spark.ui.enabled", "false") \
    .config("spark.executor.memory", "16g") \
    .config("spark.executor.cores", "4") \
    .config("spark.cores.max", "32") \
    .config("spark.driver.memory", "30g") \
    .config("spark.sql.shuffle.partitions", "96") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.5") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "4") \
    .config("spark.dynamicAllocation.maxExecutors", "8") \
    .appName("Spark Forecasting").getOrCreate()
spark
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/26 21:51:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Out[1]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.5.3
Master
local[*]
AppName
Spark Forecasting
In [2]:
train_sdf = spark.read.csv("../data/tourism/tourism_train.csv", header=True, inferSchema=True)
test_sdf = spark.read.csv("../data/tourism/tourism_test.csv", header=True, inferSchema=True)

Spark approach¶

  • You can modify the ets_forecast function to return both the fitted values (in-sample) and forecast values (out-of-sample) in a single column, while tagging them with a type column ("fitted" or "forecast").
  • Later, you can easily split or filter.
In [3]:
# Modified ets_forecast Function: Fitted + Forecast in One UDF
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType

from pandas.tseries.offsets import MonthBegin
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import pandas as pd

# Schema with an extra 'type' column to label fitted vs forecast
schema = StructType([
    StructField("date", DateType(), False),
    StructField("Region_Category", StringType(), False),
    StructField("Forecast", DoubleType(), False),
    StructField("type", StringType(), False)
])

def ets_fitted_and_forecast(pdf):
    region = pdf["Region_Category"].iloc[0]
    pdf = pdf.sort_values("date")

    try:
        ts = pdf["Visitors"].dropna()
        dates = pdf["date"]

        if len(ts) >= 24:
            model = ExponentialSmoothing(ts, trend="add", seasonal="add", seasonal_periods=12)
            fitted_model = model.fit()

            # Fitted values (same length as training data)
            fitted_values = fitted_model.fittedvalues
            fitted_df = pd.DataFrame({
                "date": dates[-len(fitted_values):].values,
                "Region_Category": region,
                "Forecast": fitted_values.values,
                "type": "fitted"
            })

            # Forecast for next 12 months
            forecast_values = fitted_model.forecast(steps=12)
            last_date = pdf["date"].max()
            forecast_dates = pd.date_range(start=last_date, periods=12, freq="ME") + MonthBegin(1)

            forecast_df = pd.DataFrame({
                "date": forecast_dates,
                "Region_Category": region,
                "Forecast": forecast_values.values,
                "type": "forecast"
            })

            result = pd.concat([fitted_df, forecast_df], ignore_index=True)

        else:
            result = pd.DataFrame({
                "date": pdf["date"],
                "Region_Category": region,
                "Forecast": [None] * len(pdf),
                "type": "fitted"
            })

    except:
        result = pd.DataFrame({
            "date": pdf["date"],
            "Region_Category": region,
            "Forecast": [None] * len(pdf),
            "type": "fitted"
        })

    return result
In [4]:
forecast_all_sdf = train_sdf.groupBy("Region_Category").applyInPandas(
    ets_fitted_and_forecast,
    schema=schema
)

forecast_all_sdf.show()
[Stage 6:>                                                          (0 + 1) / 1]
+----------+---------------+------------------+------+
|      date|Region_Category|          Forecast|  type|
+----------+---------------+------------------+------+
|1998-01-01|         AAAAll|3163.2697233948543|fitted|
|1998-02-01|         AAAAll|1778.6336508067377|fitted|
|1998-03-01|         AAAAll|1964.5793105987796|fitted|
|1998-04-01|         AAAAll| 2368.289474343738|fitted|
|1998-05-01|         AAAAll|1939.6931272936972|fitted|
|1998-06-01|         AAAAll| 1873.297280626614|fitted|
|1998-07-01|         AAAAll| 2276.910878509122|fitted|
|1998-08-01|         AAAAll|2030.1825468670795|fitted|
|1998-09-01|         AAAAll|2249.6390692360214|fitted|
|1998-10-01|         AAAAll| 2468.892447267113|fitted|
|1998-11-01|         AAAAll|2137.1171938803254|fitted|
|1998-12-01|         AAAAll| 2156.903889882829|fitted|
|1999-01-01|         AAAAll|3128.9051350700565|fitted|
|1999-02-01|         AAAAll|1565.6742174446586|fitted|
|1999-03-01|         AAAAll|1739.9329015784124|fitted|
|1999-04-01|         AAAAll|  2179.73856154665|fitted|
|1999-05-01|         AAAAll| 1865.842031962462|fitted|
|1999-06-01|         AAAAll|1701.8465731355816|fitted|
|1999-07-01|         AAAAll| 2104.648972200794|fitted|
|1999-08-01|         AAAAll|1820.0248238648078|fitted|
+----------+---------------+------------------+------+
only showing top 20 rows

                                                                                
In [5]:
from pyspark.sql.functions import explode, col

train_forecast_sdf = forecast_all_sdf.filter(col("type") == "fitted")
forecast_sdf = forecast_all_sdf.filter(col("type") == "forecast")

train_forecast_sdf.show()
forecast_sdf.show()
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
                                                                                
+----------+---------------+------------------+------+
|      date|Region_Category|          Forecast|  type|
+----------+---------------+------------------+------+
|1998-01-01|         AAAAll|3163.2697233948543|fitted|
|1998-02-01|         AAAAll|1778.6336508067377|fitted|
|1998-03-01|         AAAAll|1964.5793105987796|fitted|
|1998-04-01|         AAAAll| 2368.289474343738|fitted|
|1998-05-01|         AAAAll|1939.6931272936972|fitted|
|1998-06-01|         AAAAll| 1873.297280626614|fitted|
|1998-07-01|         AAAAll| 2276.910878509122|fitted|
|1998-08-01|         AAAAll|2030.1825468670795|fitted|
|1998-09-01|         AAAAll|2249.6390692360214|fitted|
|1998-10-01|         AAAAll| 2468.892447267113|fitted|
|1998-11-01|         AAAAll|2137.1171938803254|fitted|
|1998-12-01|         AAAAll| 2156.903889882829|fitted|
|1999-01-01|         AAAAll|3128.9051350700565|fitted|
|1999-02-01|         AAAAll|1565.6742174446586|fitted|
|1999-03-01|         AAAAll|1739.9329015784124|fitted|
|1999-04-01|         AAAAll|  2179.73856154665|fitted|
|1999-05-01|         AAAAll| 1865.842031962462|fitted|
|1999-06-01|         AAAAll|1701.8465731355816|fitted|
|1999-07-01|         AAAAll| 2104.648972200794|fitted|
|1999-08-01|         AAAAll|1820.0248238648078|fitted|
+----------+---------------+------------------+------+
only showing top 20 rows

/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
[Stage 12:>                                                         (0 + 1) / 1]
+----------+---------------+------------------+--------+
|      date|Region_Category|          Forecast|    type|
+----------+---------------+------------------+--------+
|2016-01-01|         AAAAll| 3102.656423754467|forecast|
|2016-02-01|         AAAAll|1680.2657901376683|forecast|
|2016-03-01|         AAAAll|1990.2751366602693|forecast|
|2016-04-01|         AAAAll|1998.0264213694707|forecast|
|2016-05-01|         AAAAll|1901.6160001070255|forecast|
|2016-06-01|         AAAAll| 1774.600837587313|forecast|
|2016-07-01|         AAAAll| 2080.923016745227|forecast|
|2016-08-01|         AAAAll|1801.0122793015398|forecast|
|2016-09-01|         AAAAll|1943.9770740225126|forecast|
|2016-10-01|         AAAAll|2227.8982222051372|forecast|
|2016-11-01|         AAAAll|2002.3244224259395|forecast|
|2016-12-01|         AAAAll| 2034.390487099784|forecast|
|2016-01-01|         AAABus| 297.5805840966566|forecast|
|2016-02-01|         AAABus|458.72259824713456|forecast|
|2016-03-01|         AAABus| 528.3710688119537|forecast|
|2016-04-01|         AAABus|  463.137319957957|forecast|
|2016-05-01|         AAABus| 558.7822468756337|forecast|
|2016-06-01|         AAABus|499.18248190461196|forecast|
|2016-07-01|         AAABus| 608.6718553424128|forecast|
|2016-08-01|         AAABus| 582.5088141373288|forecast|
+----------+---------------+------------------+--------+
only showing top 20 rows

                                                                                
In [6]:
from pyspark.sql.functions import col, pow, avg

# Extract In-Sample Residuals and Estimate Variance

# Step 1: Get fitted values only
fitted_sdf = forecast_all_sdf.filter(col("type") == "fitted")

# Step 2: Join with actual visitors from train_sdf to compute residuals
residuals_sdf = fitted_sdf.join(train_sdf, on=["date", "Region_Category"], how="inner") \
    .withColumn("squared_error", pow(col("Forecast") - col("Visitors"), 2))

# Step 3: Compute variance per Region_Category (mean squared error)
error_variance_sdf = residuals_sdf.groupBy("Region_Category").agg(
    avg("squared_error").alias("Forecast_Variance")
)
In [7]:
error_variance_sdf.show()
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
+---------------+------------------+
|Region_Category| Forecast_Variance|
+---------------+------------------+
|         BCBOth|272.85436597042093|
|          BCHol| 9105.598569708574|
|         BDEAll| 775.5776169835501|
|         CBDAll|14331.084040952792|
|         CCBAll|18003.738283002403|
|          CCOth| 7712.312952780992|
|         DCCAll| 437.5673214249484|
|         DDBHol|1275.5532108535858|
|         EABVis|12122.219354684257|
|         FBAVis|488.41810972817876|
|         ADBAll| 4273.105859782842|
|         BDFAll| 659.4183940165184|
|         CBCHol| 5490.905212321565|
|         FAAHol| 5213.459407634496|
|         GABVis| 164.2504662157065|
|         GBCAll|3205.0012333762947|
|          AEHol| 10451.84784359348|
|         BDBAll| 2612.127028640519|
|         BDCBus| 705.4305537828409|
|         BEGAll|1563.7787233669223|
+---------------+------------------+
only showing top 20 rows

                                                                                
In [8]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.sql.functions import row_number, lit, col, first
from pyspark.sql.window import Window

# STEP 1: Prepare wide-format forecast matrix
window = Window.partitionBy("Region_Category").orderBy("date")
forecast_sdf = forecast_all_sdf.filter(col("type") == "forecast")
forecast_sdf = forecast_sdf.withColumn("step", row_number().over(window))

forecast_wide_sdf = forecast_sdf.groupBy("Region_Category").pivot("step").agg(first("Forecast"))
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
                                                                                
In [9]:
from pyspark.sql.functions import col, sum as spark_sum

# Load the summing matrix file
summing_matrix_path = "../data/tourism/agg_mat.csv"  # Update with actual path

# Load the summing matrix file (skip the first column)
summing_sdf = spark.read.csv(summing_matrix_path, header=True, inferSchema=True)

# Convert from wide format to long format (Region_Category, Parent_Group, Weight)
summing_sdf_long = summing_sdf.selectExpr(
    "Parent_Group",
    "stack(" + str(len(summing_sdf.columns) - 1) + ", " +
    ", ".join([f"'{col}', {col}" for col in summing_sdf.columns if col != "Parent_Group"]) +
    ") as (Region_Category, Weight)"
)

# Show the reshaped summing matrix
summing_sdf_long.show()

# STEP 2: Transpose S matrix
s_matrix_T = summing_sdf_long.groupBy("Region_Category").pivot("Parent_Group").agg(first("Weight")).fillna(0)
25/03/26 21:53:20 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+------------+---------------+------+
|Parent_Group|Region_Category|Weight|
+------------+---------------+------+
|    TotalAll|         AAAHol|   1.0|
|    TotalAll|         AAAVis|   1.0|
|    TotalAll|         AAABus|   1.0|
|    TotalAll|         AAAOth|   1.0|
|    TotalAll|         AABHol|   1.0|
|    TotalAll|         AABVis|   1.0|
|    TotalAll|         AABBus|   1.0|
|    TotalAll|         AABOth|   1.0|
|    TotalAll|         ABAHol|   1.0|
|    TotalAll|         ABAVis|   1.0|
|    TotalAll|         ABABus|   1.0|
|    TotalAll|         ABAOth|   1.0|
|    TotalAll|         ABBHol|   1.0|
|    TotalAll|         ABBVis|   1.0|
|    TotalAll|         ABBBus|   1.0|
|    TotalAll|         ABBOth|   1.0|
|    TotalAll|         ACAHol|   1.0|
|    TotalAll|         ACAVis|   1.0|
|    TotalAll|         ACABus|   1.0|
|    TotalAll|         ACAOth|   1.0|
+------------+---------------+------+
only showing top 20 rows

In [ ]:
# STEP 3: Join with variance and fit one regression per step using weights
from functools import reduce

reconciled_dfs = []
for step in range(1, 13):
    step_col = str(step)

    # Join forecasts with design matrix and forecast variances
    df = forecast_wide_sdf.select("Region_Category", step_col).join(
        s_matrix_T, on="Region_Category", how="inner"
    ).join(
        error_variance_sdf.withColumn("weight", 1 / col("Forecast_Variance")),
        on="Region_Category",
        how="inner"
    )

    # Assemble features
    feature_cols = [c for c in df.columns if c not in ["Region_Category", step_col, "weight"]]
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    assembled_df = assembler.transform(df).select("Region_Category", "features", col(step_col).alias("label"), "weight")

    # WLS Regression
    lr = LinearRegression(featuresCol="features", labelCol="label", weightCol="weight")
    model = lr.fit(assembled_df)

    # Predict reconciled values
    pred_df = model.transform(assembled_df).select("Region_Category", col("prediction").alias("Forecast")) \
        .withColumn("step", lit(step))

    reconciled_dfs.append(pred_df)
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
25/03/26 21:55:42 WARN Instrumentation: [97d3b04d] regParam is zero, which might cause numerical instability and overfitting.
25/03/26 21:55:44 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/03/26 21:55:44 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
25/03/26 21:55:44 WARN Instrumentation: [97d3b04d] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
25/03/26 21:55:45 ERROR LBFGS: Failure! Resetting history: breeze.optimize.FirstOrderException: Line search zoom failed
25/03/26 21:55:45 ERROR LBFGS: Failure again! Giving up and returning. Maybe the objective is just poorly behaved?
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals.
  warnings.warn(
In [ ]:
from functools import reduce
from pyspark.sql.functions import expr

reconciled_long_df = reduce(lambda df1, df2: df1.unionByName(df2), reconciled_dfs)
test_start_date = test_sdf.selectExpr("min(date)").collect()[0][0]

reconciled_long_df = reconciled_long_df.withColumn(
    "date", expr(f"add_months(to_date('{test_start_date}'), step - 1)")
)
In [ ]:
reconciled_with_hierarchy_df = reconciled_long_df.join(
    summing_sdf_long, on="Region_Category", how="inner"
)

reconciled_agg_df = reconciled_with_hierarchy_df.withColumn(
    "Weighted_Forecast", col("Forecast") * col("Weight")
).groupBy("Parent_Group", "date").agg(
    spark_sum("Weighted_Forecast").alias("Reconciled_Forecast")
)
In [ ]:
test_with_hierarchy_df = test_sdf.join(summing_sdf_long, on="Region_Category", how="inner")

test_agg_df = test_with_hierarchy_df.withColumn(
    "Weighted_Actual", col("Visitors") * col("Weight")
).groupBy("Parent_Group", "date").agg(
    spark_sum("Weighted_Actual").alias("Actual_Visitors")
)
In [ ]:
evaluation_df = reconciled_agg_df.join(test_agg_df, on=["Parent_Group", "date"], how="inner")

evaluation_df = evaluation_df.withColumn(
    "APE", spark_abs((col("Reconciled_Forecast") - col("Actual_Visitors")) / col("Actual_Visitors"))
)

mape_df = evaluation_df.groupBy("Parent_Group").agg(avg("APE").alias("MAPE"))
overall_mape_df = mape_df.agg(avg("MAPE").alias("Overall_MAPE"))
In [ ]:
mape_df.show()
overall_mape_df()

Summary¶

  • MinT-WLS in Spark is approximate — no matrix inversion is used.

  • For full matrix-based MinT with off-diagonal covariance, it requires distributed matrix inversion techniques

Matrix inversion in Spark¶

  • Apache Spark’s MLlib library includes some linear algebra tools under pyspark.ml.linalg. You can create matrices, multiply them, and do some decompositions.

  • But: no direct .inverse() method is exposed in PySpark.

  • But in Scala, a non-distributed version Matrices.dense(...).inverse() exists via Breeze module.

  • Distributed inversion is a very difficult problem.