How to on Spark: Forecasting
Run TimeGPT distributedly on top of Spark.
TimeGPT
works on top of Spark, Dask, and Ray through Fugue.TimeGPT
will read the input DataFrame and use the corresponding engine. For example, if the input is a Spark DataFrame, StatsForecast will use the existing Spark session to run the forecast.
Installation
As long as Spark is installed and configured, TimeGPT
will be able to use it. If executing on a distributed Spark cluster, make use the nixtlats
library is installed across all the workers.
Executing on Spark
To run the forecasts distributed on Spark, just pass in a Spark DataFrame instead. Instantiate TimeGPT
class.
from nixtlats import TimeGPT
/home/ubuntu/miniconda/envs/nixtlats/lib/python3.11/site-packages/statsforecast/core.py:25: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
from tqdm.autonotebook import tqdm
timegpt = TimeGPT(
# defaults to os.environ.get("TIMEGPT_TOKEN")
token = 'my_token_provided_by_nixtla'
)
Use Spark as an engine.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/09 17:49:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/09 17:49:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Forecast
url_df = 'https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/electricity-short.csv'
spark_df = spark.createDataFrame(pd.read_csv(url_df))
spark_df.show(5)
+---------+-------------------+-----+
|unique_id| ds| y|
+---------+-------------------+-----+
| BE|2016-12-01 00:00:00| 72.0|
| BE|2016-12-01 01:00:00| 65.8|
| BE|2016-12-01 02:00:00|59.99|
| BE|2016-12-01 03:00:00|50.69|
| BE|2016-12-01 04:00:00|52.58|
+---------+-------------------+-----+
only showing top 5 rows
fcst_df = timegpt.forecast(spark_df, h=12)
fcst_df.show(5)
INFO:nixtlats.timegpt:Validating inputs... (4 + 16) / 20]
INFO:nixtlats.timegpt:Preprocessing dataframes...
INFO:nixtlats.timegpt:Inferred freq: H
INFO:nixtlats.timegpt:Calling Forecast Endpoint...=============> (19 + 1) / 20]
+---------+-------------------+------------------+
|unique_id| ds| TimeGPT|
+---------+-------------------+------------------+
| FR|2016-12-31 00:00:00|62.130218505859375|
| FR|2016-12-31 01:00:00|56.890830993652344|
| FR|2016-12-31 02:00:00| 52.23155212402344|
| FR|2016-12-31 03:00:00| 48.88866424560547|
| FR|2016-12-31 04:00:00| 46.49836730957031|
+---------+-------------------+------------------+
only showing top 5 rows
Forecast with exogenous variables
Exogenous variables or external factors are crucial in time series forecasting as they provide additional information that might influence the prediction. These variables could include holiday markers, marketing spending, weather data, or any other external data that correlate with the time series data you are forecasting. For example, if you’re forecasting ice cream sales, temperature data could serve as a useful exogenous variable. On hotter days, ice cream sales may increase. To incorporate exogenous variables in TimeGPT, you’ll need to pair each point in your time series data with the corresponding external data. Let’s see an example.
df = pd.read_csv('https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/electricity-short-with-ex-vars.csv')
spark_df = spark.createDataFrame(df)
spark_df.show(5)
+---------+-------------------+-----+----------+----------+-----+-----+-----+-----+-----+-----+-----+
|unique_id| ds| y|Exogenous1|Exogenous2|day_0|day_1|day_2|day_3|day_4|day_5|day_6|
+---------+-------------------+-----+----------+----------+-----+-----+-----+-----+-----+-----+-----+
| BE|2016-12-01 00:00:00| 72.0| 61507.0| 71066.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0|
| BE|2016-12-01 01:00:00| 65.8| 59528.0| 67311.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0|
| BE|2016-12-01 02:00:00|59.99| 58812.0| 67470.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0|
| BE|2016-12-01 03:00:00|50.69| 57676.0| 64529.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0|
| BE|2016-12-01 04:00:00|52.58| 56804.0| 62773.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0|
+---------+-------------------+-----+----------+----------+-----+-----+-----+-----+-----+-----+-----+
only showing top 5 rows
To produce forecasts we have to add the future values of the exogenous variables. Let’s read this dataset. In this case we want to predict 24 steps ahead, therefore each unique id will have 24 observations.
future_ex_vars_df = pd.read_csv('https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/electricity-short-future-ex-vars.csv')
spark_future_ex_vars_df = spark.createDataFrame(future_ex_vars_df)
spark_future_ex_vars_df.show(5)
+---------+-------------------+----------+----------+-----+-----+-----+-----+-----+-----+-----+
|unique_id| ds|Exogenous1|Exogenous2|day_0|day_1|day_2|day_3|day_4|day_5|day_6|
+---------+-------------------+----------+----------+-----+-----+-----+-----+-----+-----+-----+
| BE|2016-12-31 00:00:00| 64108.0| 70318.0| 0.0| 0.0| 0.0| 0.0| 0.0| 1.0| 0.0|
| BE|2016-12-31 01:00:00| 62492.0| 67898.0| 0.0| 0.0| 0.0| 0.0| 0.0| 1.0| 0.0|
| BE|2016-12-31 02:00:00| 61571.0| 68379.0| 0.0| 0.0| 0.0| 0.0| 0.0| 1.0| 0.0|
| BE|2016-12-31 03:00:00| 60381.0| 64972.0| 0.0| 0.0| 0.0| 0.0| 0.0| 1.0| 0.0|
| BE|2016-12-31 04:00:00| 60298.0| 62900.0| 0.0| 0.0| 0.0| 0.0| 0.0| 1.0| 0.0|
+---------+-------------------+----------+----------+-----+-----+-----+-----+-----+-----+-----+
only showing top 5 rows
Let’s call the forecast
method, adding this information:
timegpt_fcst_ex_vars_df = timegpt.forecast(df=spark_df, X_df=spark_future_ex_vars_df, h=24, level=[80, 90])
timegpt_fcst_ex_vars_df.show(5)
INFO:nixtlats.timegpt:Validating inputs...
INFO:nixtlats.timegpt:Preprocessing dataframes...
INFO:nixtlats.timegpt:Inferred freq: H
INFO:nixtlats.timegpt:Calling Forecast Endpoint...=============> (19 + 1) / 20]
+---------+-------------------+------------------+------------------+-----------------+-----------------+------------------+
|unique_id| ds| TimeGPT| TimeGPT-lo-90| TimeGPT-lo-80| TimeGPT-hi-80| TimeGPT-hi-90|
+---------+-------------------+------------------+------------------+-----------------+-----------------+------------------+
| FR|2016-12-31 00:00:00| 64.97691027939692|60.056473801735784|61.71575274765864|68.23806781113521| 69.89734675705805|
| FR|2016-12-31 01:00:00| 60.14365519077404| 56.12626745731457|56.73784790927991|63.54946247226818| 64.16104292423351|
| FR|2016-12-31 02:00:00| 59.42375860682185| 54.84932824030574|56.52975776758845|62.31775944605525| 63.99818897333796|
| FR|2016-12-31 03:00:00| 55.11264928302748| 47.59671153125746|51.95117842731459|58.27412013874037| 62.6285870347975|
| FR|2016-12-31 04:00:00|54.400922806813526|44.925772896840385|49.65213255412798|59.14971305949907|63.876072716786666|
+---------+-------------------+------------------+------------------+-----------------+-----------------+------------------+
only showing top 5 rows
spark.stop()
Updated about 1 month ago