Spaces:
Runtime error
Runtime error
# Standard library imports | |
from typing import Optional, Iterable | |
# Third-party library imports | |
from transformers import PretrainedConfig, AutoformerForPrediction | |
from functools import partial | |
import gradio as gr | |
import spaces | |
import torch | |
import pandas as pd | |
import plotly.graph_objects as go | |
from plotly.subplots import make_subplots | |
# External imports | |
# GluonTS imports | |
from gluonts.dataset.field_names import FieldName | |
from gluonts.transform import ( | |
AddAgeFeature, | |
AddObservedValuesIndicator, | |
AddTimeFeatures, | |
AsNumpyArray, | |
Chain, | |
ExpectedNumInstanceSampler, | |
InstanceSplitter, | |
RemoveFields, | |
TestSplitSampler, | |
Transformation, | |
ValidationSplitSampler, | |
VstackFeatures, | |
RenameFields, | |
) | |
from gluonts.time_feature import time_features_from_frequency_str | |
from gluonts.transform.sampler import InstanceSampler | |
# Hugging Face Datasets imports | |
from datasets import Dataset, Features, Value, Sequence, load_dataset | |
# GluonTS Loader imports | |
from gluonts.dataset.loader import as_stacked_batches | |
import matplotlib.pyplot as plt | |
import matplotlib.dates as mdates | |
import numpy as np | |
def convert_to_pandas_period(date, freq): | |
return pd.Period(date, freq) | |
def transform_start_field(batch, freq): | |
batch["start"] = [convert_to_pandas_period(date, freq) for date in batch["start"]] | |
return batch | |
def create_transformation(freq: str, config: PretrainedConfig, prediction_length: int) -> Transformation: | |
remove_field_names = [] | |
if config.num_static_real_features == 0: | |
remove_field_names.append(FieldName.FEAT_STATIC_REAL) | |
if config.num_dynamic_real_features == 0: | |
remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL) | |
if config.num_static_categorical_features == 0: | |
remove_field_names.append(FieldName.FEAT_STATIC_CAT) | |
# a bit like torchvision.transforms.Compose | |
return Chain( | |
# step 1: remove static/dynamic fields if not specified | |
[RemoveFields(field_names=remove_field_names)] | |
# step 2: convert the data to NumPy (potentially not needed) | |
+ ( | |
[ | |
AsNumpyArray( | |
field=FieldName.FEAT_STATIC_CAT, | |
expected_ndim=1, | |
dtype=int, | |
) | |
] | |
if config.num_static_categorical_features > 0 | |
else [] | |
) | |
+ ( | |
[ | |
AsNumpyArray( | |
field=FieldName.FEAT_STATIC_REAL, | |
expected_ndim=1, | |
) | |
] | |
if config.num_static_real_features > 0 | |
else [] | |
) | |
+ [ | |
AsNumpyArray( | |
field=FieldName.TARGET, | |
# we expect an extra dim for the multivariate case: | |
expected_ndim=1 if config.input_size == 1 else 2, | |
), | |
# step 3: handle the NaN's by filling in the target with zero | |
# and return the mask (which is in the observed values) | |
# true for observed values, false for nan's | |
# the decoder uses this mask (no loss is incurred for unobserved values) | |
# see loss_weights inside the xxxForPrediction model | |
AddObservedValuesIndicator( | |
target_field=FieldName.TARGET, | |
output_field=FieldName.OBSERVED_VALUES, | |
), | |
# step 4: add temporal features based on freq of the dataset | |
# and the desired prediction length | |
AddTimeFeatures( | |
start_field=FieldName.START, | |
target_field=FieldName.TARGET, | |
output_field=FieldName.FEAT_TIME, | |
time_features=time_features_from_frequency_str(freq), | |
pred_length=prediction_length, | |
), | |
# step 5: add another temporal feature (just a single number) | |
# tells the model where in its life the value of the time series is, | |
# sort of a running counter | |
AddAgeFeature( | |
target_field=FieldName.TARGET, | |
output_field=FieldName.FEAT_AGE, | |
pred_length=prediction_length, | |
log_scale=True, | |
), | |
# step 6: vertically stack all the temporal features into the key FEAT_TIME | |
VstackFeatures( | |
output_field=FieldName.FEAT_TIME, | |
input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE] | |
+ ( | |
[FieldName.FEAT_DYNAMIC_REAL] | |
if config.num_dynamic_real_features > 0 | |
else [] | |
), | |
), | |
# step 7: rename to match HuggingFace names | |
RenameFields( | |
mapping={ | |
FieldName.FEAT_STATIC_CAT: "static_categorical_features", | |
FieldName.FEAT_STATIC_REAL: "static_real_features", | |
FieldName.FEAT_TIME: "time_features", | |
FieldName.TARGET: "values", | |
FieldName.OBSERVED_VALUES: "observed_mask", | |
} | |
), | |
] | |
) | |
def create_instance_splitter( | |
config: PretrainedConfig, | |
mode: str, | |
prediction_length: int, | |
train_sampler: Optional[InstanceSampler] = None, | |
validation_sampler: Optional[InstanceSampler] = None, | |
) -> Transformation: | |
assert mode in ["train", "validation", "test"] | |
instance_sampler = { | |
"train": train_sampler | |
or ExpectedNumInstanceSampler( | |
num_instances=1.0, min_future=prediction_length | |
), | |
"validation": validation_sampler | |
or ValidationSplitSampler(min_future=prediction_length), | |
"test": TestSplitSampler(), | |
}[mode] | |
return InstanceSplitter( | |
target_field="values", | |
is_pad_field=FieldName.IS_PAD, | |
start_field=FieldName.START, | |
forecast_start_field=FieldName.FORECAST_START, | |
instance_sampler=instance_sampler, | |
past_length=config.context_length + max(config.lags_sequence), | |
future_length=prediction_length, | |
time_series_fields=["time_features", "observed_mask"], | |
) | |
def create_test_dataloader( | |
config: PretrainedConfig, | |
freq: str, | |
data: Dataset, | |
batch_size: int, | |
prediction_length: int, | |
**kwargs, | |
): | |
PREDICTION_INPUT_NAMES = [ | |
"past_time_features", | |
"past_values", | |
"past_observed_mask", | |
"future_time_features", | |
] | |
if config.num_static_categorical_features > 0: | |
PREDICTION_INPUT_NAMES.append("static_categorical_features") | |
if config.num_static_real_features > 0: | |
PREDICTION_INPUT_NAMES.append("static_real_features") | |
transformation = create_transformation(freq, config, prediction_length) | |
transformed_data = transformation.apply(data, is_train=False) | |
# we create a Test Instance splitter which will sample the very last | |
# context window seen during training only for the encoder. | |
instance_sampler = create_instance_splitter( | |
config, "test", prediction_length=prediction_length | |
) | |
# we apply the transformations in test mode | |
testing_instances = instance_sampler.apply(transformed_data, is_train=False) | |
return as_stacked_batches( | |
testing_instances, | |
batch_size=batch_size, | |
output_type=torch.tensor, | |
field_names=PREDICTION_INPUT_NAMES, | |
) | |
def plot(ts_index, test_dataset, forecasts, prediction_length): | |
# Length of the target data | |
target_length = len(test_dataset[ts_index]['target']) | |
# Creating a period range for the entire dataset plus forecast period | |
index = pd.period_range( | |
start=test_dataset[ts_index]['start'], | |
periods=target_length + prediction_length, | |
freq='1D' | |
).to_timestamp() | |
# Plotting actual data | |
actual_data = go.Scatter( | |
x=index[:target_length], | |
y=test_dataset[ts_index]['target'], | |
name="Actual", | |
mode='lines', | |
) | |
# Plotting the forecast data | |
forecast_data = go.Scatter( | |
x=index[target_length:], | |
y=forecasts[ts_index][0][:prediction_length], | |
name="Prediction", | |
mode='lines', | |
) | |
forecast_median = np.median(forecasts[ts_index][0][:prediction_length]) | |
forecast_median_data = go.Scatter( | |
x=index[target_length:], | |
y=[forecast_median] * prediction_length, | |
name="Prediction Median", | |
mode='lines', | |
) | |
forecast_std = np.std(forecasts[ts_index][0][:prediction_length]) | |
forecast_std_data = go.Scatter( | |
x=index[target_length:], | |
y=[forecast_median + forecast_std] * prediction_length, | |
name="Prediction Std", | |
mode='lines', | |
) | |
# Create the figure | |
fig = make_subplots(rows=1, cols=1) | |
fig.add_trace(actual_data, row=1, col=1) | |
fig.add_trace(forecast_data, row=1, col=1) | |
fig.add_trace(forecast_median_data, row=1, col=1) | |
fig.add_trace(forecast_std_data, row=1, col=1) | |
# Set layout and title | |
fig.update_layout( | |
xaxis_title="Date", | |
yaxis_title="Value", | |
title="Actual vs. Predicted Values", | |
xaxis_rangeslider_visible=True, | |
) | |
return fig | |
def do_prediction(days_to_predict: int): | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
# Define the desired prediction length | |
prediction_length = days_to_predict # Number of time steps to predict into the future | |
freq = "1D" # Daily frequency | |
dataset = load_dataset("thesven/BTC-Daily-Avg-Market-Value") | |
dataset['test'].set_transform(partial(transform_start_field, freq=freq)) | |
model = AutoformerForPrediction.from_pretrained("thesven/BTC-Autoformer-v1") | |
config = model.config | |
print(f"Config: {config}") | |
test_dataloader = create_test_dataloader( | |
config=config, | |
freq=freq, | |
data=dataset['test'], | |
batch_size=64, | |
prediction_length=prediction_length, | |
) | |
model.to(device) | |
model.eval() | |
forecasts = [] | |
for batch in test_dataloader: | |
outputs = model.generate( | |
static_categorical_features=batch["static_categorical_features"].to(device) | |
if config.num_static_categorical_features > 0 | |
else None, | |
static_real_features=batch["static_real_features"].to(device) | |
if config.num_static_real_features > 0 | |
else None, | |
past_time_features=batch["past_time_features"].to(device), | |
past_values=batch["past_values"].to(device), | |
future_time_features=batch["future_time_features"].to(device), | |
past_observed_mask=batch["past_observed_mask"].to(device), | |
) | |
forecasts.append(outputs.sequences.cpu().numpy()) | |
forecasts = np.vstack(forecasts) | |
print(forecasts.shape) | |
return plot(0, dataset['test'], forecasts, prediction_length) | |
interface = gr.Interface( | |
fn=do_prediction, | |
inputs=gr.Slider(minimum=1, maximum=30, step=1, label="Days to Predict"), | |
outputs="plot", | |
title="Prediction Plot", | |
description="Adjust the slider to set the number of days to predict.", | |
allow_flagging=False, # Disable flagging for simplicity | |
) | |
interface.launch() | |