diff --git a/flaml/automl/automl.py b/flaml/automl/automl.py index 078ce67a20..7dacc0e22f 100644 --- a/flaml/automl/automl.py +++ b/flaml/automl/automl.py @@ -1054,12 +1054,14 @@ def _validate_ts_data( assert ( dataframe[[dataframe.columns[0]]].duplicated() is None ), "Duplicate timestamp values with different values for other columns." - ts_series = pd.to_datetime(dataframe[dataframe.columns[0]]) - inferred_freq = pd.infer_freq(ts_series) - if inferred_freq is None: - logger.warning( - "Missing timestamps detected. To avoid error with estimators, set estimator list to ['prophet']. " - ) + if self._state.task in TS_FORECAST and not TS_FORECASTPANEL: + # TFT estimator model for TS_FORECASTPANEL can handle missing data + ts_series = pd.to_datetime(dataframe[dataframe.columns[0]]) + inferred_freq = pd.infer_freq(ts_series) + if inferred_freq is None: + logger.warning( + "Missing timestamps detected. To avoid error with estimators, set estimator list to ['prophet']. " + ) if y_train_all is not None: return dataframe.iloc[:, :-1], dataframe.iloc[:, -1] return dataframe @@ -1720,7 +1722,11 @@ def retrain_from_log( `time_varying_known_categoricals`, `time_varying_known_reals`, `time_varying_unknown_categoricals`, `time_varying_unknown_reals`, `variable_groups`. To provide more information on your data, use - `max_encoder_length`, `min_encoder_length`, `lags`. + `max_encoder_length`, `min_encoder_length`, `lags`. To fill in + missing values with constant values, use `constant_fill_strategy`, + else forward fill strategy is used by default. + freq: str or pandas offset | The frequency of the time-series, used only for + 'ts_forecast_panel' task. log_dir: str, default = "lightning_logs" | Folder into which to log results for tensorboard, only used by TemporalFusionTransformerEstimator. max_epochs: int, default = 20 | Maximum number of epochs to run training, diff --git a/flaml/automl/data.py b/flaml/automl/data.py index 01f27d14e6..6c7cb94c8a 100644 --- a/flaml/automl/data.py +++ b/flaml/automl/data.py @@ -8,6 +8,7 @@ from pandas import DataFrame, Series from flaml.automl.training_log import training_log_reader +# from flaml.automl.automl import logger from datetime import datetime from typing import Union @@ -254,19 +255,32 @@ def add_time_idx_col(X): unique_dates = X[TS_TIMESTAMP_COL].drop_duplicates().sort_values(ascending=True) # assume no missing timestamps freq = pd.infer_freq(unique_dates) + if freq is None: + # if missing timestamps, try to guess frequency from given time stamps + lookup_table = { + pd.Timedelta('31 days 00:00:00'): "MS", + pd.Timedelta('365 days 00:00:00'): "Y", + } + diff = [X[TS_TIMESTAMP_COL][idx + 1] - X[TS_TIMESTAMP_COL][idx] for idx in range(len(X[TS_TIMESTAMP_COL]) - 1)] + med_diff = np.median(diff) + freq = med_diff if med_diff not in lookup_table else lookup_table[med_diff] + # logger.warning( + # f"Missing Timestamps detected. Inferred Frequency: {freq}." + # f"If inferred frequency is incorrect, please pass in `freq` as a fit_kwargs" + # ) if freq == "MS": X["time_idx"] = X[TS_TIMESTAMP_COL].dt.year * 12 + X[TS_TIMESTAMP_COL].dt.month elif freq == "Y": X["time_idx"] = X[TS_TIMESTAMP_COL].dt.year else: # using time frequency to generate all time stamps and then indexing for time_idx - # full_range = pd.date_range(X[TS_TIMESTAMP_COL].min(), X[TS_TIMESTAMP_COL].max(), freq=freq).to_list() - # X["time_idx"] = [full_range.index(time) for time in X[TS_TIMESTAMP_COL]] + full_range = pd.date_range(X[TS_TIMESTAMP_COL].min(), X[TS_TIMESTAMP_COL].max(), freq=freq).to_list() + X["time_idx"] = [full_range.index(time) for time in X[TS_TIMESTAMP_COL]] # taking minimum difference in timestamp - timestamps = unique_dates.view("int64") - freq = int(timestamps.diff().mode()) - X["time_idx"] = timestamps - timestamps.min() / freq - X["time_idx"] = X["time_idx"].astype("int") + # timestamps = unique_dates.view("int64") + # freq = int(timestamps.diff().median()) + # X["time_idx"] = timestamps - timestamps.min() / freq + # X["time_idx"] = X["time_idx"].astype("int") return X diff --git a/flaml/automl/model.py b/flaml/automl/model.py index bccad6470f..ca2fbb51f2 100644 --- a/flaml/automl/model.py +++ b/flaml/automl/model.py @@ -1808,10 +1808,11 @@ def fit(self, X_train, y_train, budget=None, free_mem_ratio=0, **kwargs): cols.remove(TS_VALUE_COL) logging.getLogger("prophet").setLevel(logging.WARNING) model = Prophet(**self.params) + kwargs.pop("period") for regressor in cols: model.add_regressor(regressor) with suppress_stdout_stderr(): - model.fit(train_df) + model.fit(train_df, **kwargs) train_time = time.time() - current_time self._model = model return train_time @@ -1901,8 +1902,9 @@ def fit(self, X_train, y_train, budget=None, free_mem_ratio=0, **kwargs): enforce_stationarity=False, enforce_invertibility=False, ) + kwargs.pop("period") with suppress_stdout_stderr(): - model = model.fit() + model = model.fit(**kwargs) train_time = time.time() - current_time self._model = model return train_time @@ -2013,8 +2015,9 @@ def fit(self, X_train, y_train, budget=None, free_mem_ratio=0, **kwargs): enforce_stationarity=False, enforce_invertibility=False, ) + kwargs.pop("period") with suppress_stdout_stderr(): - model = model.fit() + model = model.fit(**kwargs) train_time = time.time() - current_time self._model = model return train_time @@ -2047,7 +2050,7 @@ def search_space(cls, data_size, pred_horizon, **params): def __init__(self, task="ts_forecast", **params): super().__init__(task, **params) - self.hcrystaball_model = None + self.hcrystalball_model = None self.ts_task = ( "regression" if task in TS_FORECASTREGRESSION else "classification" ) @@ -2072,32 +2075,33 @@ def _fit(self, X_train, y_train, budget=None, **kwargs): lags = params.pop("lags") optimize_for_horizon = params.pop("optimize_for_horizon") estimator = self.base_class(task=self.ts_task, **params) - self.hcrystaball_model = get_sklearn_wrapper(estimator.estimator_class) - self.hcrystaball_model.lags = int(lags) - self.hcrystaball_model.fit(X_train, y_train) + self.hcrystalball_model = get_sklearn_wrapper(estimator.estimator_class) + self.hcrystalball_model.lags = int(lags) + period = kwargs.pop("period") + self.hcrystalball_model.fit(X_train, y_train, **kwargs) if optimize_for_horizon: # Direct Multi-step Forecast Strategy - fit a seperate model for each horizon model_list = [] - for i in range(1, kwargs["period"] + 1): + for i in range(1, period + 1): ( X_fit, y_fit, - ) = self.hcrystaball_model._transform_data_to_tsmodel_input_format( + ) = self.hcrystalball_model._transform_data_to_tsmodel_input_format( X_train, y_train, i ) - self.hcrystaball_model.model.set_params(**estimator.params) - model = self.hcrystaball_model.model.fit(X_fit, y_fit) + self.hcrystalball_model.model.set_params(**estimator.params) + model = self.hcrystalball_model.model.fit(X_fit, y_fit, **kwargs) model_list.append(model) self._model = model_list else: ( X_fit, y_fit, - ) = self.hcrystaball_model._transform_data_to_tsmodel_input_format( - X_train, y_train, kwargs["period"] + ) = self.hcrystalball_model._transform_data_to_tsmodel_input_format( + X_train, y_train, period ) - self.hcrystaball_model.model.set_params(**estimator.params) - model = self.hcrystaball_model.model.fit(X_fit, y_fit) + self.hcrystalball_model.model.set_params(**estimator.params) + model = self.hcrystalball_model.model.fit(X_fit, y_fit, **kwargs) self._model = model def fit(self, X_train, y_train, budget=None, free_mem_ratio=0, **kwargs): @@ -2119,20 +2123,20 @@ def predict(self, X, **kwargs): ( X_pred, _, - ) = self.hcrystaball_model._transform_data_to_tsmodel_input_format( + ) = self.hcrystalball_model._transform_data_to_tsmodel_input_format( X.iloc[:i, :] ) preds.append(self._model[i - 1].predict(X_pred, **kwargs)[-1]) forecast = DataFrame( data=np.asarray(preds).reshape(-1, 1), - columns=[self.hcrystaball_model.name], + columns=[self.hcrystalball_model.name], index=X.index, ) else: ( X_pred, _, - ) = self.hcrystaball_model._transform_data_to_tsmodel_input_format(X) + ) = self.hcrystalball_model._transform_data_to_tsmodel_input_format(X) forecast = self._model.predict(X_pred, **kwargs) return forecast else: @@ -2246,6 +2250,8 @@ def transform_ds(self, X_train, y_train, **kwargs): variable_groups=kwargs.get( "variable_groups", {} ), # group of categorical variables can be treated as one variable + constant_fill_strategy=kwargs.get("constant_fill_strategy", {}), + allow_missing_timesteps=True, lags=kwargs.get("lags", {}), target_normalizer=GroupNormalizer( groups=kwargs["group_ids"], transformation="softplus" @@ -2293,7 +2299,7 @@ def fit(self, X_train, y_train, budget=None, free_mem_ratio=0, **kwargs): monitor="val_loss", min_delta=1e-4, patience=10, verbose=False, mode="min" ) - def _fit(log): + def _fit(log, **kwargs): default_trainer_kwargs = dict( gpus=kwargs.get("gpu_per_trial", [0]) if torch.cuda.is_available() @@ -2323,6 +2329,7 @@ def _fit(log): tft, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader, + **kwargs ) return trainer @@ -2336,7 +2343,13 @@ def _fit(log): # except ValueError: # issue with pytorch forecasting model log_prediction() function # pytorch-forecasting issue #1145 - trainer = _fit(log=False) + fit_keys = ["period", "gpu_per_trial", "group_ids", "freq", "log_dir", "max_epochs", "batch_size", + "static_categoricals", "static_reals", "time_varying_known_categoricals", "time_varying_known_reals", + "time_varying_unknown_reals", "time_varying_unknown_categoricals", "variable_groups", + "max_encoder_length", "min_encoder_length", "lags", "constant_fill_strategy"] + for key in fit_keys: + if key in kwargs: kwargs.pop(key) + trainer = _fit(log=False, **kwargs) best_model_path = trainer.checkpoint_callback.best_model_path best_tft = TemporalFusionTransformer.load_from_checkpoint(best_model_path) train_time = time.time() - current_time diff --git a/test/automl/test_forecast.py b/test/automl/test_forecast.py index 4b9c040e4c..8cc0ae51b7 100644 --- a/test/automl/test_forecast.py +++ b/test/automl/test_forecast.py @@ -591,10 +591,132 @@ def smape(y_pred, y_test): print(automl.min_resource) +def test_forecast_panel_missing(budget=5): + data, special_days = get_stalliion_data() + time_horizon = 6 # predict six months + training_cutoff = data["time_idx"].max() - time_horizon + data["time_idx"] = data["time_idx"].astype("int") + ts_col = data.pop("date") + data.insert(0, "date", ts_col) + # FLAML assumes input is not sorted, but we sort here for comparison purposes with y_test + data = data.sort_values(["agency", "sku", "date"]) + X_train = data[lambda x: x.time_idx <= training_cutoff] + print(X_train.head()) + print("Before Drop", len(X_train)) + np.random.seed(1) + drop_indices = np.random.choice(X_train.index, 10, replace=False) # randomly choose 10 rows of train data to remove + X_train = X_train.drop(drop_indices) + print("After Drop", len(X_train)) + X_test = data[lambda x: x.time_idx > training_cutoff] + y_train = X_train.pop("volume") + y_test = X_test.pop("volume") + print(len(X_train)) + print(len(y_train)) + print(len(X_test)) + print(len(y_test)) + automl = AutoML() + settings = { + "time_budget": budget, # total running time in seconds + "metric": "mape", # primary metric + "task": "ts_forecast_panel", # task type + "log_file_name": "test/stallion_forecast_missing.log", # flaml log file + "eval_method": "holdout", + } + fit_kwargs_by_estimator = { + "tft": { + "max_encoder_length": 24, + "static_categoricals": ["agency", "sku"], + "static_reals": ["avg_population_2017", "avg_yearly_household_income_2017"], + "time_varying_known_categoricals": ["special_days", "month"], + "variable_groups": { + "special_days": special_days + }, # group of categorical variables can be treated as one variable + "time_varying_known_reals": [ + "time_idx", + "price_regular", + "discount_in_percent", + ], + "time_varying_unknown_categoricals": [], + "time_varying_unknown_reals": [ + "y", # always need a 'y' column for the target column + "log_volume", + "industry_volume", + "soda_volume", + "avg_max_temp", + "avg_volume_by_agency", + "avg_volume_by_sku", + ], + "batch_size": 256, + "max_epochs": 1, + "gpu_per_trial": -1, + "constant_fill_strategy": { + "discount_in_percent": 0.0, + "soda_volume": 0.0, + } # for the gaps, use these constants to fill in missing values + } + } + """The main flaml automl API""" + automl.fit( + X_train=X_train, + y_train=y_train, + **settings, + period=time_horizon, + group_ids=["agency", "sku"], + fit_kwargs_by_estimator=fit_kwargs_by_estimator, + ) + """ retrieve best config and best learner""" + print("Best ML leaner:", automl.best_estimator) + print("Best hyperparmeter config:", automl.best_config) + print(f"Best mape on validation data: {automl.best_loss}") + print(f"Training duration of best run: {automl.best_config_train_time}s") + print(automl.model.estimator) + """ pickle and save the automl object """ + import pickle + + with open("automl.pkl", "wb") as f: + pickle.dump(automl, f, pickle.HIGHEST_PROTOCOL) + """ compute predictions of testing dataset """ + y_pred = automl.predict(X_test) + """ compute different metric values on testing dataset""" + from flaml.automl.ml import sklearn_metric_loss_score + + print(y_test) + print(y_pred) + print("mape", "=", sklearn_metric_loss_score("mape", y_pred, y_test)) + + def smape(y_pred, y_test): + import numpy as np + + y_test, y_pred = np.array(y_test), np.array(y_pred) + return round( + np.mean(np.abs(y_pred - y_test) / ((np.abs(y_pred) + np.abs(y_test)) / 2)) + * 100, + 2, + ) + + print("smape", "=", smape(y_pred, y_test)) + from flaml.automl.data import get_output_from_log + + ( + time_history, + best_valid_loss_history, + valid_loss_history, + config_history, + metric_history, + ) = get_output_from_log(filename=settings["log_file_name"], time_budget=budget) + for config in config_history: + print(config) + print(automl.resource_attr) + print(automl.max_resource) + print(automl.min_resource) + + if __name__ == "__main__": - test_forecast_automl(60) - test_multivariate_forecast_num(5) - test_multivariate_forecast_cat(5) - test_numpy() - test_forecast_classification(5) - test_forecast_panel(5) + # test_forecast_automl(60) + # test_multivariate_forecast_num(5) + # test_multivariate_forecast_cat(5) + # test_numpy() + # test_forecast_classification(5) + # test_forecast_panel(5) + print("testing missing data") + test_forecast_panel_missing(5)