Skip to content

Commit

Permalink
[Major] Dataloader: Just-In-Time tabularization (#1529)
Browse files Browse the repository at this point in the history
* minimal pytest

* move_func_getitem

* slicing

* predict_mode

* typos

* lr-finder

* drop_missing

* predict_v2

* predict_v3

* samples

* lagged regressor n_lags

* preliminary: events, holidays

* adjustes pytests

* selective forecasting

* black

* ruff

* lagged_regressors

* Note down df path to TimeDataset

* complete notes on TimeDataset, move meta

* Big rewrite with real and pseudocode

* create_target_start_end_mask

* boolean mask

* combine masks into map

* notes for nan check

* bypass NAN filter

* rework index to point at prediction origin, not first forecast.

* tabularize: converted time and lags to single sample extraction

* convert lagged regressors

* consolidate seasonality computation in one script

* finish Seasonlity conversion

* update todos

* complete targets and future regressors

* convert events

* finish events and holidays conversion

* debug timedataset

* debugging

* make_country_specific_holidays_df

* remove uses of df.loc[...].values

* debug time

* debugging types

* debug timedata

* debugging time_dataset variable shapes

* address indexing and slicing issues, .loc

* fix dimensions except nonstationary components

* integrate torch formatting into tabularize

* check shapes

* AirPassengers test working!

* fix dataset generator

* fixed all performance tests but Energy due to nonstationary components

* fixed nonstationary issue. all performance tests running

* refactor tabularize function

* fix bug

* initial build of GlobalTimeDataset

* refactor TimeDataset not to use kwargs passthrough

* debugged seasonal components call of TimeDataset

* fix numpy object type error

* fix seasonality condition bugs

* fix events and future regressor cases

* fixing prediction frequency filter

* performance_test_energy

* debug events

* convert new energytest to daily data

* fix events util reference

* fix test_get_country_holidays

* fix test_timedataset_minima

* fix selective forecasting

* cleanup timedataset

* refactor tabularize_univariate

* daily_data

* start nan check for smaple mask

* working on time nan2

* fix tests

* finish nan-check

* fix dims

* pass self.df to indexing

* fix zero dim lagged regressors

* close figures in tests

* fix typings

* black

* ruff

* linting

* linting

* modify logs

* add benchmarking script for computational time

* speed up uncertainty tests

* fix unit test multiple country

* reduce tests log level to ERROR

* reduce log level to ERROR and fix adding multiple countries

* bypass intentional glocal test error log

* fix prev

* benchmark dataloader time

* remove hourly energy test

* add debug notebook for energy hourly

* set to log model performance INFO

* address config_regressors.regressors

* clean up create_nan_mask

* clean up create_nan_mask params

* clean TimeDataframe

* update prediction frequency documentation

* improve prediction frequency documentation

* further improve prediction frequency documentation

* fix test errors

* fix df_names call

* fix selective prediction assertion

* normalize holiday naes

* fix linting

* fix tests

* update to use new holiday functions in event_utils.py

* fix seasonality_local_reg test

* limit holidays to less than 1.0

* changed holidays

* update lock

* changed tests

* adjsuted tests

* fix reserved names

* fixed ruff lintint

* changed test

* translate holidays to english is possible

* exclude py3.13

* update lock

* Merge all holidays related tests in one file

* add deterministic flag

* fixed ruff linting issues

* fixed glocal test

* fix lock file

* update poetry

* moved the deterministic flag to the train method

* update lock file

---------

Co-authored-by: Simon W <[email protected]>
Co-authored-by: MaiBe-ctrl <[email protected]>
Co-authored-by: Maisa Ben Salah <[email protected]>
  • Loading branch information
4 people authored Jun 26, 2024
1 parent e90bf5a commit 1bfa633
Show file tree
Hide file tree
Showing 29 changed files with 4,251 additions and 881 deletions.
19 changes: 17 additions & 2 deletions docs/source/code/forecaster.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
NeuralProphet Class
-----------------------
Core Module Documentation
==========================

.. toctree::
:hidden:
:maxdepth: 1

configure.py <configure>
df_utils.py <df_utils>
event_utils.py <event_utils>
plot_forecast_plotly.py <plot_forecast_plotly>
plot_forecast_matplotlib.py <plot_forecast_matplotlib>
plot_model_parameters_plotly.py <plot_model_parameters_plotly>
plot_model_parameters_matplotlib.py <plot_model_parameters_matplotlib>
time_dataset.py <time_dataset>
time_net.py <time_net>
utils.py <utils>

.. automodule:: neuralprophet.forecaster
:members:
5 changes: 0 additions & 5 deletions docs/source/code/hdays_utils.rst

This file was deleted.

3 changes: 0 additions & 3 deletions docs/source/how-to-guides/feature-guides/mlflow.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@
"# Start a new MLflow run\n",
"if local:\n",
" with mlflow.start_run():\n",
"\n",
" # Create a new MLflow experiment\n",
" mlflow.set_experiment(\"NP-MLflow Quickstart_v1\")\n",
"\n",
Expand Down Expand Up @@ -259,7 +258,6 @@
"from mlflow.data.pandas_dataset import PandasDataset\n",
"\n",
"if local:\n",
"\n",
" mlflow.pytorch.autolog(\n",
" log_every_n_epoch=1,\n",
" log_every_n_step=None,\n",
Expand All @@ -279,7 +277,6 @@
" model_name = \"NeuralProphet\"\n",
"\n",
" with mlflow.start_run() as run:\n",
"\n",
" dataset: PandasDataset = mlflow.data.from_pandas(df, source=\"AirPassengersDataset\")\n",
"\n",
" # Log the dataset to the MLflow Run. Specify the \"training\" context to indicate that the\n",
Expand Down
6 changes: 4 additions & 2 deletions neuralprophet/components/future_regressors/linear.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ def scalar_features_effects(self, features, params, indices=None):
if indices is not None:
features = features[:, :, indices]
params = params[:, indices]

return torch.sum(features.unsqueeze(dim=2) * params.unsqueeze(dim=0).unsqueeze(dim=0), dim=-1)
# features dims: (batch, n_forecasts, n_features) -> (batch, n_forecasts, 1, n_features)
# params dims: (n_quantiles, n_features) -> (batch, 1, n_quantiles, n_features)
out = torch.sum(features.unsqueeze(dim=2) * params.unsqueeze(dim=0).unsqueeze(dim=0), dim=-1)
return out # dims (batch, n_forecasts, n_quantiles)

def get_reg_weights(self, name):
"""
Expand Down
11 changes: 5 additions & 6 deletions neuralprophet/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from neuralprophet import df_utils, np_types, utils_torch
from neuralprophet.custom_loss_metrics import PinballLoss
from neuralprophet.hdays_utils import get_holidays_from_country
from neuralprophet.event_utils import get_holiday_names

log = logging.getLogger("NP.config")

Expand All @@ -42,10 +42,9 @@ def init_data_params(
config_events: Optional[ConfigEvents] = None,
config_seasonality: Optional[ConfigSeasonality] = None,
):
if len(df["ID"].unique()) == 1:
if not self.global_normalization:
log.info("Setting normalization to global as only one dataframe provided for training.")
self.global_normalization = True
if len(df["ID"].unique()) == 1 and not self.global_normalization:
log.info("Setting normalization to global as only one dataframe provided for training.")
self.global_normalization = True
self.local_data_params, self.global_data_params = df_utils.init_data_params(
df=df,
normalize=self.normalize,
Expand Down Expand Up @@ -508,7 +507,7 @@ class Holidays:
holiday_names: set = field(init=False)

def init_holidays(self, df=None):
self.holiday_names = get_holidays_from_country(self.country, df)
self.holiday_names = get_holiday_names(self.country, df)


ConfigCountryHolidays = Holidays
25 changes: 13 additions & 12 deletions neuralprophet/data/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,18 +333,18 @@ def _validate_column_name(
"""
reserved_names = [
"trend",
"additive_terms",
"daily",
"weekly",
"yearly",
"events",
"holidays",
"zeros",
"extra_regressors_additive",
"yhat",
"extra_regressors_multiplicative",
"multiplicative_terms",
"ID",
"y_scaled",
"ds",
"t",
"y",
"index",
]
rn_l = [n + "_lower" for n in reserved_names]
rn_u = [n + "_upper" for n in reserved_names]
Expand Down Expand Up @@ -434,14 +434,14 @@ def _check_dataframe(

def _handle_missing_data(
df: pd.DataFrame,
freq: Optional[str],
freq: str,
n_lags: int,
n_forecasts: int,
config_missing,
config_regressors: Optional[ConfigFutureRegressors],
config_lagged_regressors: Optional[ConfigLaggedRegressors],
config_events: Optional[ConfigEvents],
config_seasonality: Optional[ConfigSeasonality],
config_regressors: Optional[ConfigFutureRegressors] = None,
config_lagged_regressors: Optional[ConfigLaggedRegressors] = None,
config_events: Optional[ConfigEvents] = None,
config_seasonality: Optional[ConfigSeasonality] = None,
predicting: bool = False,
) -> pd.DataFrame:
"""
Expand Down Expand Up @@ -618,12 +618,13 @@ def _create_dataset(model, df, predict_mode, prediction_frequency=None):
predict_mode=predict_mode,
n_lags=model.n_lags,
n_forecasts=model.n_forecasts,
prediction_frequency=prediction_frequency,
predict_steps=model.predict_steps,
config_seasonality=model.config_seasonality,
config_events=model.config_events,
config_country_holidays=model.config_country_holidays,
config_lagged_regressors=model.config_lagged_regressors,
config_regressors=model.config_regressors,
config_lagged_regressors=model.config_lagged_regressors,
config_missing=model.config_missing,
prediction_frequency=prediction_frequency,
# config_train=model.config_train, # no longer needed since JIT tabularization.
)
23 changes: 10 additions & 13 deletions neuralprophet/df_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,28 +88,27 @@ def return_df_in_original_format(df, received_ID_col=False, received_single_time
return new_df


def get_max_num_lags(config_lagged_regressors: Optional[ConfigLaggedRegressors], n_lags: int) -> int:
def get_max_num_lags(n_lags: int, config_lagged_regressors: Optional[ConfigLaggedRegressors]) -> int:
"""Get the greatest number of lags between the autoregression lags and the covariates lags.
Parameters
----------
config_lagged_regressors : configure.ConfigLaggedRegressors
Configurations for lagged regressors
n_lags : int
number of lagged values of series to include as model inputs
config_lagged_regressors : configure.ConfigLaggedRegressors
Configurations for lagged regressors
Returns
-------
int
Maximum number of lags between the autoregression lags and the covariates lags.
"""
if config_lagged_regressors is not None:
log.debug("config_lagged_regressors exists")
max_n_lags = max([n_lags] + [val.n_lags for key, val in config_lagged_regressors.items()])
# log.debug("config_lagged_regressors exists")
return max([n_lags] + [val.n_lags for key, val in config_lagged_regressors.items()])
else:
log.debug("config_lagged_regressors does not exist")
max_n_lags = n_lags
return max_n_lags
# log.debug("config_lagged_regressors does not exist")
return n_lags


def merge_dataframes(df: pd.DataFrame) -> pd.DataFrame:
Expand Down Expand Up @@ -508,14 +507,12 @@ def check_dataframe(
for name in columns:
if name not in df:
raise ValueError(f"Column {name!r} missing from dataframe")
if df.loc[df.loc[:, name].notnull()].shape[0] < 1:
if sum(df.loc[:, name].notnull().values) < 1:
raise ValueError(f"Dataframe column {name!r} only has NaN rows.")
if not np.issubdtype(df[name].dtype, np.number):
df[name] = pd.to_numeric(df[name])
if np.isinf(df.loc[:, name].values).any():
df.loc[:, name] = df[name].replace([np.inf, -np.inf], np.nan)
if df.loc[df.loc[:, name].notnull()].shape[0] < 1:
raise ValueError(f"Dataframe column {name!r} only has NaN rows.")

if future:
return df, regressors_to_remove, lag_regressors_to_remove
Expand Down Expand Up @@ -1541,10 +1538,10 @@ def drop_missing_from_df(df, drop_missing, predict_steps, n_lags):
if all_nan_idx[i + 1] - all_nan_idx[i] > 1:
break
# drop NaN window
df = df.drop(df.index[window[0] : window[-1] + 1]).reset_index().drop("index", axis=1)
df = df.drop(df.index[window[0] : window[-1] + 1]).reset_index(drop=True)
# drop lagged values if window does not occur at the beginning of df
if window[0] - (n_lags - 1) >= 0:
df = df.drop(df.index[(window[0] - (n_lags - 1)) : window[0]]).reset_index().drop("index", axis=1)
df = df.drop(df.index[(window[0] - (n_lags - 1)) : window[0]]).reset_index(drop=True)
return df


Expand Down
71 changes: 71 additions & 0 deletions neuralprophet/event_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from collections import defaultdict
from typing import Iterable, Union

import numpy as np
import pandas as pd
from holidays import country_holidays


def get_holiday_names(country: Union[str, Iterable[str]], df=None):
"""
Return all possible holiday names for a list of countries over time period in df
Parameters
----------
country : str, list
List of country names to retrieve country specific holidays
df : pd.Dataframe
Dataframe from which datestamps will be retrieved from
Returns
-------
set
All possible holiday names of given country
"""
if df is None:
years = np.arange(1995, 2045)
else:
dates = df["ds"].copy(deep=True)
years = pd.unique(dates.apply(lambda x: x.year))
# years = list({x.year for x in dates})
# support multiple countries, convert to list if not already
if isinstance(country, str):
country = [country]

all_holidays = get_all_holidays(years=years, country=country)
return set(all_holidays.keys())


def get_all_holidays(years, country):
"""
Make dataframe of country specific holidays for given years and countries
Parameters
----------
year_list : list
List of years
country : str, list, dict
List of country names and optional subdivisions
Returns
-------
pd.DataFrame
Containing country specific holidays df with columns 'ds' and 'holiday'
"""
# convert to list if not already
if isinstance(country, str):
country = {country: None}
elif isinstance(country, list):
country = dict(zip(country, [None] * len(country)))

all_holidays = defaultdict(list)
# iterate over countries and get holidays for each country
for single_country, subdivision in country.items():
# For compatibility with Turkey as "TU" cases.
single_country = "TUR" if single_country == "TU" else single_country
# get dict of dates and their holiday name
single_country_specific_holidays = country_holidays(
country=single_country, subdiv=subdivision, years=years, expand=True, observed=False, language="en"
)
# invert order - for given holiday, store list of dates
for date, name in single_country_specific_holidays.items():
all_holidays[name].append(pd.to_datetime(date))
return all_holidays
Loading

0 comments on commit 1bfa633

Please sign in to comment.