Weighted Reconciliation with Spark¶
Feng Li¶
Guanghua School of Management¶
Peking University¶
feng.li@gsm.pku.edu.cn¶
Course home page: https://feng.li/bdcf¶
MinT-WLS¶
- MinT-WLS assumes a diagonal forecast error covariance matrix $ W $, where each diagonal element is the variance of forecast errors for each series (e.g., Region_Category). The formula becomes:
$$ \tilde{y} = S (S^\top W^{-1} S)^{-1} S^\top W^{-1} \hat{y} $$
- $ \hat{y} $: base forecasts (from ETS, etc.)
- $ S $: summing matrix
- $ W $: diagonal matrix of forecast error variances from the training residuals
How to Approximate MinT-WLS in Spark?¶
Since Spark is not optimized for full matrix ops, you can:
- Estimate error variance per Region_Category using training residuals.
- To compute forecast error variances for use in MinT-WLS, you need in-sample forecasts (i.e., forecasts over the training period, not just for the future 12 months). These are often called fitted values from the model.
- Use the inverse variance as weights.
- Perform a weighted projection manually (just like MinT-OLS, but with weights).
In [1]:
import os, sys # Ensure All environment variables are properly set
# os.environ["JAVA_HOME"] = os.path.dirname(sys.executable)
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable
from pyspark.sql import SparkSession # build Spark Session
spark = SparkSession.builder \
.config("spark.ui.enabled", "false") \
.config("spark.executor.memory", "16g") \
.config("spark.executor.cores", "4") \
.config("spark.cores.max", "32") \
.config("spark.driver.memory", "30g") \
.config("spark.sql.shuffle.partitions", "96") \
.config("spark.memory.fraction", "0.8") \
.config("spark.memory.storageFraction", "0.5") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", "4") \
.config("spark.dynamicAllocation.maxExecutors", "8") \
.appName("Spark Forecasting").getOrCreate()
spark
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/03/26 21:51:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Out[1]:
SparkSession - in-memory
In [2]:
train_sdf = spark.read.csv("../data/tourism/tourism_train.csv", header=True, inferSchema=True)
test_sdf = spark.read.csv("../data/tourism/tourism_test.csv", header=True, inferSchema=True)
Spark approach¶
- You can modify the
ets_forecast
function to return both the fitted values (in-sample) and forecast values (out-of-sample) in a single column, while tagging them with a type column ("fitted" or "forecast"). - Later, you can easily split or filter.
In [3]:
# Modified ets_forecast Function: Fitted + Forecast in One UDF
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType
from pandas.tseries.offsets import MonthBegin
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import pandas as pd
# Schema with an extra 'type' column to label fitted vs forecast
schema = StructType([
StructField("date", DateType(), False),
StructField("Region_Category", StringType(), False),
StructField("Forecast", DoubleType(), False),
StructField("type", StringType(), False)
])
def ets_fitted_and_forecast(pdf):
region = pdf["Region_Category"].iloc[0]
pdf = pdf.sort_values("date")
try:
ts = pdf["Visitors"].dropna()
dates = pdf["date"]
if len(ts) >= 24:
model = ExponentialSmoothing(ts, trend="add", seasonal="add", seasonal_periods=12)
fitted_model = model.fit()
# Fitted values (same length as training data)
fitted_values = fitted_model.fittedvalues
fitted_df = pd.DataFrame({
"date": dates[-len(fitted_values):].values,
"Region_Category": region,
"Forecast": fitted_values.values,
"type": "fitted"
})
# Forecast for next 12 months
forecast_values = fitted_model.forecast(steps=12)
last_date = pdf["date"].max()
forecast_dates = pd.date_range(start=last_date, periods=12, freq="ME") + MonthBegin(1)
forecast_df = pd.DataFrame({
"date": forecast_dates,
"Region_Category": region,
"Forecast": forecast_values.values,
"type": "forecast"
})
result = pd.concat([fitted_df, forecast_df], ignore_index=True)
else:
result = pd.DataFrame({
"date": pdf["date"],
"Region_Category": region,
"Forecast": [None] * len(pdf),
"type": "fitted"
})
except:
result = pd.DataFrame({
"date": pdf["date"],
"Region_Category": region,
"Forecast": [None] * len(pdf),
"type": "fitted"
})
return result
In [4]:
forecast_all_sdf = train_sdf.groupBy("Region_Category").applyInPandas(
ets_fitted_and_forecast,
schema=schema
)
forecast_all_sdf.show()
[Stage 6:> (0 + 1) / 1]
+----------+---------------+------------------+------+ | date|Region_Category| Forecast| type| +----------+---------------+------------------+------+ |1998-01-01| AAAAll|3163.2697233948543|fitted| |1998-02-01| AAAAll|1778.6336508067377|fitted| |1998-03-01| AAAAll|1964.5793105987796|fitted| |1998-04-01| AAAAll| 2368.289474343738|fitted| |1998-05-01| AAAAll|1939.6931272936972|fitted| |1998-06-01| AAAAll| 1873.297280626614|fitted| |1998-07-01| AAAAll| 2276.910878509122|fitted| |1998-08-01| AAAAll|2030.1825468670795|fitted| |1998-09-01| AAAAll|2249.6390692360214|fitted| |1998-10-01| AAAAll| 2468.892447267113|fitted| |1998-11-01| AAAAll|2137.1171938803254|fitted| |1998-12-01| AAAAll| 2156.903889882829|fitted| |1999-01-01| AAAAll|3128.9051350700565|fitted| |1999-02-01| AAAAll|1565.6742174446586|fitted| |1999-03-01| AAAAll|1739.9329015784124|fitted| |1999-04-01| AAAAll| 2179.73856154665|fitted| |1999-05-01| AAAAll| 1865.842031962462|fitted| |1999-06-01| AAAAll|1701.8465731355816|fitted| |1999-07-01| AAAAll| 2104.648972200794|fitted| |1999-08-01| AAAAll|1820.0248238648078|fitted| +----------+---------------+------------------+------+ only showing top 20 rows
In [5]:
from pyspark.sql.functions import explode, col
train_forecast_sdf = forecast_all_sdf.filter(col("type") == "fitted")
forecast_sdf = forecast_all_sdf.filter(col("type") == "forecast")
train_forecast_sdf.show()
forecast_sdf.show()
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn(
+----------+---------------+------------------+------+ | date|Region_Category| Forecast| type| +----------+---------------+------------------+------+ |1998-01-01| AAAAll|3163.2697233948543|fitted| |1998-02-01| AAAAll|1778.6336508067377|fitted| |1998-03-01| AAAAll|1964.5793105987796|fitted| |1998-04-01| AAAAll| 2368.289474343738|fitted| |1998-05-01| AAAAll|1939.6931272936972|fitted| |1998-06-01| AAAAll| 1873.297280626614|fitted| |1998-07-01| AAAAll| 2276.910878509122|fitted| |1998-08-01| AAAAll|2030.1825468670795|fitted| |1998-09-01| AAAAll|2249.6390692360214|fitted| |1998-10-01| AAAAll| 2468.892447267113|fitted| |1998-11-01| AAAAll|2137.1171938803254|fitted| |1998-12-01| AAAAll| 2156.903889882829|fitted| |1999-01-01| AAAAll|3128.9051350700565|fitted| |1999-02-01| AAAAll|1565.6742174446586|fitted| |1999-03-01| AAAAll|1739.9329015784124|fitted| |1999-04-01| AAAAll| 2179.73856154665|fitted| |1999-05-01| AAAAll| 1865.842031962462|fitted| |1999-06-01| AAAAll|1701.8465731355816|fitted| |1999-07-01| AAAAll| 2104.648972200794|fitted| |1999-08-01| AAAAll|1820.0248238648078|fitted| +----------+---------------+------------------+------+ only showing top 20 rows
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( [Stage 12:> (0 + 1) / 1]
+----------+---------------+------------------+--------+ | date|Region_Category| Forecast| type| +----------+---------------+------------------+--------+ |2016-01-01| AAAAll| 3102.656423754467|forecast| |2016-02-01| AAAAll|1680.2657901376683|forecast| |2016-03-01| AAAAll|1990.2751366602693|forecast| |2016-04-01| AAAAll|1998.0264213694707|forecast| |2016-05-01| AAAAll|1901.6160001070255|forecast| |2016-06-01| AAAAll| 1774.600837587313|forecast| |2016-07-01| AAAAll| 2080.923016745227|forecast| |2016-08-01| AAAAll|1801.0122793015398|forecast| |2016-09-01| AAAAll|1943.9770740225126|forecast| |2016-10-01| AAAAll|2227.8982222051372|forecast| |2016-11-01| AAAAll|2002.3244224259395|forecast| |2016-12-01| AAAAll| 2034.390487099784|forecast| |2016-01-01| AAABus| 297.5805840966566|forecast| |2016-02-01| AAABus|458.72259824713456|forecast| |2016-03-01| AAABus| 528.3710688119537|forecast| |2016-04-01| AAABus| 463.137319957957|forecast| |2016-05-01| AAABus| 558.7822468756337|forecast| |2016-06-01| AAABus|499.18248190461196|forecast| |2016-07-01| AAABus| 608.6718553424128|forecast| |2016-08-01| AAABus| 582.5088141373288|forecast| +----------+---------------+------------------+--------+ only showing top 20 rows
In [6]:
from pyspark.sql.functions import col, pow, avg
# Extract In-Sample Residuals and Estimate Variance
# Step 1: Get fitted values only
fitted_sdf = forecast_all_sdf.filter(col("type") == "fitted")
# Step 2: Join with actual visitors from train_sdf to compute residuals
residuals_sdf = fitted_sdf.join(train_sdf, on=["date", "Region_Category"], how="inner") \
.withColumn("squared_error", pow(col("Forecast") - col("Visitors"), 2))
# Step 3: Compute variance per Region_Category (mean squared error)
error_variance_sdf = residuals_sdf.groupBy("Region_Category").agg(
avg("squared_error").alias("Forecast_Variance")
)
In [7]:
error_variance_sdf.show()
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn(
+---------------+------------------+ |Region_Category| Forecast_Variance| +---------------+------------------+ | BCBOth|272.85436597042093| | BCHol| 9105.598569708574| | BDEAll| 775.5776169835501| | CBDAll|14331.084040952792| | CCBAll|18003.738283002403| | CCOth| 7712.312952780992| | DCCAll| 437.5673214249484| | DDBHol|1275.5532108535858| | EABVis|12122.219354684257| | FBAVis|488.41810972817876| | ADBAll| 4273.105859782842| | BDFAll| 659.4183940165184| | CBCHol| 5490.905212321565| | FAAHol| 5213.459407634496| | GABVis| 164.2504662157065| | GBCAll|3205.0012333762947| | AEHol| 10451.84784359348| | BDBAll| 2612.127028640519| | BDCBus| 705.4305537828409| | BEGAll|1563.7787233669223| +---------------+------------------+ only showing top 20 rows
In [8]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.sql.functions import row_number, lit, col, first
from pyspark.sql.window import Window
# STEP 1: Prepare wide-format forecast matrix
window = Window.partitionBy("Region_Category").orderBy("date")
forecast_sdf = forecast_all_sdf.filter(col("type") == "forecast")
forecast_sdf = forecast_sdf.withColumn("step", row_number().over(window))
forecast_wide_sdf = forecast_sdf.groupBy("Region_Category").pivot("step").agg(first("Forecast"))
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn(
In [9]:
from pyspark.sql.functions import col, sum as spark_sum
# Load the summing matrix file
summing_matrix_path = "../data/tourism/agg_mat.csv" # Update with actual path
# Load the summing matrix file (skip the first column)
summing_sdf = spark.read.csv(summing_matrix_path, header=True, inferSchema=True)
# Convert from wide format to long format (Region_Category, Parent_Group, Weight)
summing_sdf_long = summing_sdf.selectExpr(
"Parent_Group",
"stack(" + str(len(summing_sdf.columns) - 1) + ", " +
", ".join([f"'{col}', {col}" for col in summing_sdf.columns if col != "Parent_Group"]) +
") as (Region_Category, Weight)"
)
# Show the reshaped summing matrix
summing_sdf_long.show()
# STEP 2: Transpose S matrix
s_matrix_T = summing_sdf_long.groupBy("Region_Category").pivot("Parent_Group").agg(first("Weight")).fillna(0)
25/03/26 21:53:20 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+------------+---------------+------+ |Parent_Group|Region_Category|Weight| +------------+---------------+------+ | TotalAll| AAAHol| 1.0| | TotalAll| AAAVis| 1.0| | TotalAll| AAABus| 1.0| | TotalAll| AAAOth| 1.0| | TotalAll| AABHol| 1.0| | TotalAll| AABVis| 1.0| | TotalAll| AABBus| 1.0| | TotalAll| AABOth| 1.0| | TotalAll| ABAHol| 1.0| | TotalAll| ABAVis| 1.0| | TotalAll| ABABus| 1.0| | TotalAll| ABAOth| 1.0| | TotalAll| ABBHol| 1.0| | TotalAll| ABBVis| 1.0| | TotalAll| ABBBus| 1.0| | TotalAll| ABBOth| 1.0| | TotalAll| ACAHol| 1.0| | TotalAll| ACAVis| 1.0| | TotalAll| ACABus| 1.0| | TotalAll| ACAOth| 1.0| +------------+---------------+------+ only showing top 20 rows
In [ ]:
# STEP 3: Join with variance and fit one regression per step using weights
from functools import reduce
reconciled_dfs = []
for step in range(1, 13):
step_col = str(step)
# Join forecasts with design matrix and forecast variances
df = forecast_wide_sdf.select("Region_Category", step_col).join(
s_matrix_T, on="Region_Category", how="inner"
).join(
error_variance_sdf.withColumn("weight", 1 / col("Forecast_Variance")),
on="Region_Category",
how="inner"
)
# Assemble features
feature_cols = [c for c in df.columns if c not in ["Region_Category", step_col, "weight"]]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
assembled_df = assembler.transform(df).select("Region_Category", "features", col(step_col).alias("label"), "weight")
# WLS Regression
lr = LinearRegression(featuresCol="features", labelCol="label", weightCol="weight")
model = lr.fit(assembled_df)
# Predict reconciled values
pred_df = model.transform(assembled_df).select("Region_Category", col("prediction").alias("Forecast")) \
.withColumn("step", lit(step))
reconciled_dfs.append(pred_df)
/nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( 25/03/26 21:55:42 WARN Instrumentation: [97d3b04d] regParam is zero, which might cause numerical instability and overfitting. 25/03/26 21:55:44 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS 25/03/26 21:55:44 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK 25/03/26 21:55:44 WARN Instrumentation: [97d3b04d] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver. 25/03/26 21:55:45 ERROR LBFGS: Failure! Resetting history: breeze.optimize.FirstOrderException: Line search zoom failed 25/03/26 21:55:45 ERROR LBFGS: Failure again! Giving up and returning. Maybe the objective is just poorly behaved? /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn( /nfs-share/home/2406184221/.local/lib/python3.12/site-packages/statsmodels/tsa/holtwinters/model.py:918: ConvergenceWarning: Optimization failed to converge. Check mle_retvals. warnings.warn(
In [ ]:
from functools import reduce
from pyspark.sql.functions import expr
reconciled_long_df = reduce(lambda df1, df2: df1.unionByName(df2), reconciled_dfs)
test_start_date = test_sdf.selectExpr("min(date)").collect()[0][0]
reconciled_long_df = reconciled_long_df.withColumn(
"date", expr(f"add_months(to_date('{test_start_date}'), step - 1)")
)
In [ ]:
reconciled_with_hierarchy_df = reconciled_long_df.join(
summing_sdf_long, on="Region_Category", how="inner"
)
reconciled_agg_df = reconciled_with_hierarchy_df.withColumn(
"Weighted_Forecast", col("Forecast") * col("Weight")
).groupBy("Parent_Group", "date").agg(
spark_sum("Weighted_Forecast").alias("Reconciled_Forecast")
)
In [ ]:
test_with_hierarchy_df = test_sdf.join(summing_sdf_long, on="Region_Category", how="inner")
test_agg_df = test_with_hierarchy_df.withColumn(
"Weighted_Actual", col("Visitors") * col("Weight")
).groupBy("Parent_Group", "date").agg(
spark_sum("Weighted_Actual").alias("Actual_Visitors")
)
In [ ]:
evaluation_df = reconciled_agg_df.join(test_agg_df, on=["Parent_Group", "date"], how="inner")
evaluation_df = evaluation_df.withColumn(
"APE", spark_abs((col("Reconciled_Forecast") - col("Actual_Visitors")) / col("Actual_Visitors"))
)
mape_df = evaluation_df.groupBy("Parent_Group").agg(avg("APE").alias("MAPE"))
overall_mape_df = mape_df.agg(avg("MAPE").alias("Overall_MAPE"))
In [ ]:
mape_df.show()
overall_mape_df()
Summary¶
MinT-WLS in Spark is approximate — no matrix inversion is used.
For full matrix-based MinT with off-diagonal covariance, it requires distributed matrix inversion techniques
Matrix inversion in Spark¶
Apache Spark’s MLlib library includes some linear algebra tools under pyspark.ml.linalg. You can create matrices, multiply them, and do some decompositions.
But: no direct
.inverse()
method is exposed in PySpark.But in Scala, a non-distributed version
Matrices.dense(...).inverse()
exists via Breeze module.Distributed inversion is a very difficult problem.