Skip to main content

Run TimeGPT in a Distributed Manner on Spark

Spark is an open-source distributed compute framework designed for large-scale data processing. With Spark, you can seamlessly scale your Python-based workflows for big data analytics and machine learning tasks. This tutorial demonstrates how to use TimeGPT with Spark to perform forecasting and cross-validation.
Open In Colab
If executing on a distributed Spark cluster, be sure the nixtla library (and any dependencies) are installed on all worker nodes to ensure consistent execution across the cluster.
1

1. Installation

Fugue provides a convenient interface to distribute Python code across frameworks like Spark.
Install fugue with Spark support:
pip install fugue with spark
pip install fugue[spark]
To work with TimeGPT, make sure you have the nixtla library installed as well.
2

2. Load Data

Load the dataset into a pandas DataFrame. In this example, we use hourly electricity price data from different markets.
load electricity price data
import pandas as pd

df = pd.read_csv(
    'https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/electricity-short.csv',
    parse_dates=['ds'],
)
df.head()
3

3. Initialize Spark

Create a Spark session and convert your pandas DataFrame to a Spark DataFrame:
spark session creation and conversion
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

spark_df = spark.createDataFrame(df)
spark_df.show(5)
4

4. Use TimeGPT on Spark

Key Concept
Using TimeGPT with Spark is very similar to using it locally. The main difference is that you work with Spark DataFrames instead of pandas DataFrames.
TimeGPT can handle large-scale data when distributed via Spark, allowing you to scale your time series forecasting tasks efficiently.
NixtlaClient initialization
from nixtla import NixtlaClient

nixtla_client = NixtlaClient(
    api_key='my_api_key_provided_by_nixtla'  # defaults to os.environ.get("NIXTLA_API_KEY")
)
If you need to use an Azure AI endpoint, set the base_url parameter:
NixtlaClient with Azure AI endpoint
nixtla_client = NixtlaClient(
    base_url="your azure ai endpoint",
    api_key="your api_key"
)
forecasting with NixtlaClient on Spark
fcst_df = nixtla_client.forecast(spark_df, h=12)
fcst_df.show(5)
When using Azure AI endpoints, specify model="azureai".
AzureAI model usage example
nixtla_client.forecast(
    spark_df,
    h=12,
    model="azureai"
)
The public API supports two models: timegpt-1 (default) and timegpt-1-long-horizon. For long horizon forecasting, see this tutorial.
Perform cross-validation with Spark DataFrames:
cross-validation example
cv_df = nixtla_client.cross_validation(
    spark_df,
    h=12,
    n_windows=5,
    step_size=2
)
cv_df.show(5)
For including exogenous variables with TimeGPT on Spark, use Spark DataFrames instead of pandas DataFrames, as demonstrated in the Exogenous Variables tutorial.
5

5. Stop Spark

After completing your tasks, stop the Spark session to free resources:
stop spark session
spark.stop()
I