How to on Spark: Forecasting

Run TimeGPT distributedly on top of Spark.
TimeGPT works on top of Spark, Dask, and Ray through Fugue. TimeGPT will read the input DataFrame and use the corresponding engine. For example, if the input is a Spark DataFrame, StatsForecast will use the existing Spark session to run the forecast.

Installation

As long as Spark is installed and configured, TimeGPT will be able to use it. If executing on a distributed Spark cluster, make use the nixtlats library is installed across all the workers.

Executing on Spark

To run the forecasts distributed on Spark, just pass in a Spark DataFrame instead. Instantiate TimeGPT class.


from nixtlats import TimeGPT

/home/ubuntu/miniconda/envs/nixtlats/lib/python3.11/site-packages/statsforecast/core.py:25: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
  from tqdm.autonotebook import tqdm

timegpt = TimeGPT(
    # defaults to os.environ.get("TIMEGPT_TOKEN")
    token = 'my_token_provided_by_nixtla'
)

Use Spark as an engine.


from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/09 17:49:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/09 17:49:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.

Forecast


url_df = 'https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/electricity-short.csv'
spark_df = spark.createDataFrame(pd.read_csv(url_df))
spark_df.show(5)

+---------+-------------------+-----+
|unique_id|                 ds|    y|
+---------+-------------------+-----+
|       BE|2016-12-01 00:00:00| 72.0|
|       BE|2016-12-01 01:00:00| 65.8|
|       BE|2016-12-01 02:00:00|59.99|
|       BE|2016-12-01 03:00:00|50.69|
|       BE|2016-12-01 04:00:00|52.58|
+---------+-------------------+-----+
only showing top 5 rows

fcst_df = timegpt.forecast(spark_df, h=12)
fcst_df.show(5)

INFO:nixtlats.timegpt:Validating inputs...                        (4 + 16) / 20]
INFO:nixtlats.timegpt:Preprocessing dataframes...
INFO:nixtlats.timegpt:Inferred freq: H
INFO:nixtlats.timegpt:Calling Forecast Endpoint...=============>  (19 + 1) / 20]
                                                                                

+---------+-------------------+------------------+
|unique_id|                 ds|           TimeGPT|
+---------+-------------------+------------------+
|       FR|2016-12-31 00:00:00|62.130218505859375|
|       FR|2016-12-31 01:00:00|56.890830993652344|
|       FR|2016-12-31 02:00:00| 52.23155212402344|
|       FR|2016-12-31 03:00:00| 48.88866424560547|
|       FR|2016-12-31 04:00:00| 46.49836730957031|
+---------+-------------------+------------------+
only showing top 5 rows

Forecast with exogenous variables

Exogenous variables or external factors are crucial in time series forecasting as they provide additional information that might influence the prediction. These variables could include holiday markers, marketing spending, weather data, or any other external data that correlate with the time series data you are forecasting. For example, if you’re forecasting ice cream sales, temperature data could serve as a useful exogenous variable. On hotter days, ice cream sales may increase. To incorporate exogenous variables in TimeGPT, you’ll need to pair each point in your time series data with the corresponding external data. Let’s see an example.


df = pd.read_csv('https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/electricity-short-with-ex-vars.csv')
spark_df = spark.createDataFrame(df)
spark_df.show(5)

+---------+-------------------+-----+----------+----------+-----+-----+-----+-----+-----+-----+-----+
|unique_id|                 ds|    y|Exogenous1|Exogenous2|day_0|day_1|day_2|day_3|day_4|day_5|day_6|
+---------+-------------------+-----+----------+----------+-----+-----+-----+-----+-----+-----+-----+
|       BE|2016-12-01 00:00:00| 72.0|   61507.0|   71066.0|  0.0|  0.0|  0.0|  1.0|  0.0|  0.0|  0.0|
|       BE|2016-12-01 01:00:00| 65.8|   59528.0|   67311.0|  0.0|  0.0|  0.0|  1.0|  0.0|  0.0|  0.0|
|       BE|2016-12-01 02:00:00|59.99|   58812.0|   67470.0|  0.0|  0.0|  0.0|  1.0|  0.0|  0.0|  0.0|
|       BE|2016-12-01 03:00:00|50.69|   57676.0|   64529.0|  0.0|  0.0|  0.0|  1.0|  0.0|  0.0|  0.0|
|       BE|2016-12-01 04:00:00|52.58|   56804.0|   62773.0|  0.0|  0.0|  0.0|  1.0|  0.0|  0.0|  0.0|
+---------+-------------------+-----+----------+----------+-----+-----+-----+-----+-----+-----+-----+
only showing top 5 rows

To produce forecasts we have to add the future values of the exogenous variables. Let’s read this dataset. In this case we want to predict 24 steps ahead, therefore each unique id will have 24 observations.


future_ex_vars_df = pd.read_csv('https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/electricity-short-future-ex-vars.csv')
spark_future_ex_vars_df = spark.createDataFrame(future_ex_vars_df)
spark_future_ex_vars_df.show(5)

+---------+-------------------+----------+----------+-----+-----+-----+-----+-----+-----+-----+
|unique_id|                 ds|Exogenous1|Exogenous2|day_0|day_1|day_2|day_3|day_4|day_5|day_6|
+---------+-------------------+----------+----------+-----+-----+-----+-----+-----+-----+-----+
|       BE|2016-12-31 00:00:00|   64108.0|   70318.0|  0.0|  0.0|  0.0|  0.0|  0.0|  1.0|  0.0|
|       BE|2016-12-31 01:00:00|   62492.0|   67898.0|  0.0|  0.0|  0.0|  0.0|  0.0|  1.0|  0.0|
|       BE|2016-12-31 02:00:00|   61571.0|   68379.0|  0.0|  0.0|  0.0|  0.0|  0.0|  1.0|  0.0|
|       BE|2016-12-31 03:00:00|   60381.0|   64972.0|  0.0|  0.0|  0.0|  0.0|  0.0|  1.0|  0.0|
|       BE|2016-12-31 04:00:00|   60298.0|   62900.0|  0.0|  0.0|  0.0|  0.0|  0.0|  1.0|  0.0|
+---------+-------------------+----------+----------+-----+-----+-----+-----+-----+-----+-----+
only showing top 5 rows

Let’s call the forecast method, adding this information:


timegpt_fcst_ex_vars_df = timegpt.forecast(df=spark_df, X_df=spark_future_ex_vars_df, h=24, level=[80, 90])
timegpt_fcst_ex_vars_df.show(5)

INFO:nixtlats.timegpt:Validating inputs...
INFO:nixtlats.timegpt:Preprocessing dataframes...
INFO:nixtlats.timegpt:Inferred freq: H
INFO:nixtlats.timegpt:Calling Forecast Endpoint...=============>  (19 + 1) / 20]
                                                                                

+---------+-------------------+------------------+------------------+-----------------+-----------------+------------------+
|unique_id|                 ds|           TimeGPT|     TimeGPT-lo-90|    TimeGPT-lo-80|    TimeGPT-hi-80|     TimeGPT-hi-90|
+---------+-------------------+------------------+------------------+-----------------+-----------------+------------------+
|       FR|2016-12-31 00:00:00| 64.97691027939692|60.056473801735784|61.71575274765864|68.23806781113521| 69.89734675705805|
|       FR|2016-12-31 01:00:00| 60.14365519077404| 56.12626745731457|56.73784790927991|63.54946247226818| 64.16104292423351|
|       FR|2016-12-31 02:00:00| 59.42375860682185| 54.84932824030574|56.52975776758845|62.31775944605525| 63.99818897333796|
|       FR|2016-12-31 03:00:00| 55.11264928302748| 47.59671153125746|51.95117842731459|58.27412013874037|  62.6285870347975|
|       FR|2016-12-31 04:00:00|54.400922806813526|44.925772896840385|49.65213255412798|59.14971305949907|63.876072716786666|
+---------+-------------------+------------------+------------------+-----------------+-----------------+------------------+
only showing top 5 rows

spark.stop()