From 2ae45068d27d48bb038870dd2cb142f416a9d025 Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Mon, 24 Jun 2024 17:40:03 +0200 Subject: [PATCH 01/15] enable re-training --- neuralprophet/forecaster.py | 55 +++++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index d80fcef14..7ec036c0c 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -962,7 +962,7 @@ def fit( pd.DataFrame metrics with training and potentially evaluation metrics """ - if self.fitted: + if self.fitted and not continue_training: raise RuntimeError("Model has been fitted already. Please initialize a new model to fit again.") # Configuration @@ -2645,23 +2645,23 @@ def _init_train_loader(self, df, num_workers=0): torch DataLoader """ df, _, _, _ = df_utils.prep_or_copy_df(df) - # if not self.fitted: - self.config_normalization.init_data_params( - df=df, - config_lagged_regressors=self.config_lagged_regressors, - config_regressors=self.config_regressors, - config_events=self.config_events, - config_seasonality=self.config_seasonality, - ) + if not self.fitted: + self.config_normalization.init_data_params( + df=df, + config_lagged_regressors=self.config_lagged_regressors, + config_regressors=self.config_regressors, + config_events=self.config_events, + config_seasonality=self.config_seasonality, + ) df = _normalize(df=df, config_normalization=self.config_normalization) - # if not self.fitted: - if self.config_trend.changepoints is not None: - # scale user-specified changepoint times - df_aux = pd.DataFrame({"ds": pd.Series(self.config_trend.changepoints)}) + if not self.fitted: + if self.config_trend.changepoints is not None: + # scale user-specified changepoint times + df_aux = pd.DataFrame({"ds": pd.Series(self.config_trend.changepoints)}) - df_normalized = _normalize(df=df_aux, config_normalization=self.config_normalization) - self.config_trend.changepoints = df_normalized["t"].values # type: ignore + df_normalized = _normalize(df=df_aux, config_normalization=self.config_normalization) + self.config_trend.changepoints = df_normalized["t"].values # type: ignore # df_merged, _ = df_utils.join_dataframes(df) # df_merged = df_merged.sort_values("ds") @@ -2740,6 +2740,13 @@ def _train( pd.DataFrame metrics """ + # Test + if continue_training: + checkpoint_path = self.metrics_logger.checkpoint_path + print(checkpoint_path) + checkpoint = torch.load(checkpoint_path) + print(checkpoint.keys()) + # Set up data the training dataloader df, _, _, _ = df_utils.prep_or_copy_df(df) train_loader = self._init_train_loader(df, num_workers) @@ -2748,12 +2755,20 @@ def _train( # Internal flag to check if validation is enabled validation_enabled = df_val is not None - # Init the model, if not continue from checkpoint + # Load model and optimizer state from checkpoint if continue_training is True if continue_training: - raise NotImplementedError( - "Continuing training from checkpoint is not implemented yet. This feature is planned for one of the \ - upcoming releases." - ) + checkpoint_path = self.metrics_logger.checkpoint_path + checkpoint = torch.load(checkpoint_path) + self.model = self._init_model() + # TODO: fix size mismatch for trend.trend_changepoints_t: copying a param with shape torch.Size([11]) from checkpoint, the shape in current model is torch.Size([12]). + self.model.load_state_dict(checkpoint["state_dict"], strict=False) + self.optimizer.load_state_dict(checkpoint["optimizer_states"][0]) + self.trainer.current_epoch = checkpoint["epoch"] + 1 + if "lr_schedulers" in checkpoint: + self.lr_scheduler.load_state_dict(checkpoint["lr_schedulers"][0]) + print(f"Resuming training from epoch {self.trainer.current_epoch}") + # TODO: remove print, checkpoint['lr_schedulers'] + print(f"Resuming training from epoch {self.trainer.current_epoch}") else: self.model = self._init_model() From 900c8d5f1cfdb0da32ebffea2d0b10b9121711b4 Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Sat, 29 Jun 2024 14:10:56 +0200 Subject: [PATCH 02/15] update scheduler --- neuralprophet/forecaster.py | 81 ++++++++++++++++++++++++++++--------- 1 file changed, 61 insertions(+), 20 deletions(-) diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index 7ec036c0c..fd372680b 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -2654,6 +2654,7 @@ def _init_train_loader(self, df, num_workers=0): config_seasonality=self.config_seasonality, ) + print("Changepoints:", self.config_trend.changepoints) df = _normalize(df=df, config_normalization=self.config_normalization) if not self.fitted: if self.config_trend.changepoints is not None: @@ -2746,11 +2747,10 @@ def _train( print(checkpoint_path) checkpoint = torch.load(checkpoint_path) print(checkpoint.keys()) - - # Set up data the training dataloader - df, _, _, _ = df_utils.prep_or_copy_df(df) - train_loader = self._init_train_loader(df, num_workers) - dataset_size = len(df) # train_loader.dataset + print("Current model trend changepoints:", self.model.trend.trend_changepoints_t) + # self.model = time_net.TimeNet.load_from_checkpoint(checkpoint_path) + # self.model.load_state_dict(checkpoint["state_dict"], strict=False) + print(self.model.train_loader) # Internal flag to check if validation is enabled validation_enabled = df_val is not None @@ -2759,20 +2759,55 @@ def _train( if continue_training: checkpoint_path = self.metrics_logger.checkpoint_path checkpoint = torch.load(checkpoint_path) - self.model = self._init_model() - # TODO: fix size mismatch for trend.trend_changepoints_t: copying a param with shape torch.Size([11]) from checkpoint, the shape in current model is torch.Size([12]). - self.model.load_state_dict(checkpoint["state_dict"], strict=False) - self.optimizer.load_state_dict(checkpoint["optimizer_states"][0]) - self.trainer.current_epoch = checkpoint["epoch"] + 1 - if "lr_schedulers" in checkpoint: - self.lr_scheduler.load_state_dict(checkpoint["lr_schedulers"][0]) - print(f"Resuming training from epoch {self.trainer.current_epoch}") - # TODO: remove print, checkpoint['lr_schedulers'] - print(f"Resuming training from epoch {self.trainer.current_epoch}") + + # Load model state + self.model.load_state_dict(checkpoint["state_dict"]) + + # Adjust epochs + additional_epochs = 10 + previous_epochs = self.config_train.epochs # Get the number of epochs already trained + new_total_epochs = previous_epochs + additional_epochs + self.config_train.epochs = new_total_epochs + + # Reinitialize optimizer with loaded model parameters + optimizer = torch.optim.AdamW(self.model.parameters()) + + # Load optimizer state + if "optimizer_states" in checkpoint and checkpoint["optimizer_states"]: + optimizer.load_state_dict(checkpoint["optimizer_states"][0]) + + self.config_train.optimizer = optimizer + + # Calculate total steps and steps already taken + steps_per_epoch = len(self.model.train_loader) + total_steps = steps_per_epoch * new_total_epochs + steps_taken = steps_per_epoch * previous_epochs + + # Create new scheduler with updated total steps + self.config_train.scheduler = torch.optim.lr_scheduler.OneCycleLR( + optimizer=optimizer, + total_steps=total_steps, + max_lr=10, + pct_start=(total_steps - steps_taken) / total_steps, # Adjust the percentage of remaining steps + ) + + # Manually update the scheduler's step count + for _ in range(steps_taken): + self.config_train.scheduler.step() + + print(f"Scheduler: {self.config_train.scheduler}") + print( + f"Total steps: {total_steps}, Steps taken: {steps_taken}, Remaining steps: {total_steps - steps_taken}" + ) + else: - self.model = self._init_model() + # Set up data the training dataloader + df, _, _, _ = df_utils.prep_or_copy_df(df) + train_loader = self._init_train_loader(df, num_workers) + dataset_size = len(df) # train_loader.dataset - self.model.train_loader = train_loader + self.model = self._init_model() + self.model.train_loader = train_loader # Init the Trainer self.trainer, checkpoint_callback = utils.configure_trainer( @@ -2785,9 +2820,15 @@ def _train( progress_bar_enabled=progress_bar_enabled, metrics_enabled=metrics_enabled, checkpointing_enabled=checkpointing_enabled, - num_batches_per_epoch=len(train_loader), + num_batches_per_epoch=len(self.model.train_loader), ) + # TODO: find out why scheduler not updated + if continue_training: + self.trainer.lr_schedulers = [ + {"scheduler": self.config_train.scheduler, "interval": "step", "frequency": 1} + ] + # Tune hyperparams and train if validation_enabled: # Set up data the validation dataloader @@ -2812,7 +2853,7 @@ def _train( start = time.time() self.trainer.fit( self.model, - train_loader, + self.model.train_loader, val_loader, ckpt_path=self.metrics_logger.checkpoint_path if continue_training else None, ) @@ -2834,7 +2875,7 @@ def _train( start = time.time() self.trainer.fit( self.model, - train_loader, + self.model.train_loader, ckpt_path=self.metrics_logger.checkpoint_path if continue_training else None, ) From f1355eb5dd37dbcd018f34d0a11d1a338f109b20 Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Mon, 1 Jul 2024 00:19:54 +0200 Subject: [PATCH 03/15] change scheduler for continued training --- neuralprophet/forecaster.py | 53 ++++++++----------------------------- neuralprophet/time_net.py | 23 +++++++++++----- 2 files changed, 28 insertions(+), 48 deletions(-) diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index fd372680b..cd7422fe2 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -2741,20 +2741,15 @@ def _train( pd.DataFrame metrics """ - # Test - if continue_training: - checkpoint_path = self.metrics_logger.checkpoint_path - print(checkpoint_path) - checkpoint = torch.load(checkpoint_path) - print(checkpoint.keys()) - print("Current model trend changepoints:", self.model.trend.trend_changepoints_t) - # self.model = time_net.TimeNet.load_from_checkpoint(checkpoint_path) - # self.model.load_state_dict(checkpoint["state_dict"], strict=False) - print(self.model.train_loader) # Internal flag to check if validation is enabled validation_enabled = df_val is not None + # Set up data the training dataloader + df, _, _, _ = df_utils.prep_or_copy_df(df) + train_loader = self._init_train_loader(df, num_workers) + dataset_size = len(df) # train_loader.dataset + # Load model and optimizer state from checkpoint if continue_training is True if continue_training: checkpoint_path = self.metrics_logger.checkpoint_path @@ -2763,8 +2758,11 @@ def _train( # Load model state self.model.load_state_dict(checkpoint["state_dict"]) + # Set continue_training flag in model to update scheduler correctly + self.model.continue_training = True + # Adjust epochs - additional_epochs = 10 + additional_epochs = 50 previous_epochs = self.config_train.epochs # Get the number of epochs already trained new_total_epochs = previous_epochs + additional_epochs self.config_train.epochs = new_total_epochs @@ -2778,34 +2776,7 @@ def _train( self.config_train.optimizer = optimizer - # Calculate total steps and steps already taken - steps_per_epoch = len(self.model.train_loader) - total_steps = steps_per_epoch * new_total_epochs - steps_taken = steps_per_epoch * previous_epochs - - # Create new scheduler with updated total steps - self.config_train.scheduler = torch.optim.lr_scheduler.OneCycleLR( - optimizer=optimizer, - total_steps=total_steps, - max_lr=10, - pct_start=(total_steps - steps_taken) / total_steps, # Adjust the percentage of remaining steps - ) - - # Manually update the scheduler's step count - for _ in range(steps_taken): - self.config_train.scheduler.step() - - print(f"Scheduler: {self.config_train.scheduler}") - print( - f"Total steps: {total_steps}, Steps taken: {steps_taken}, Remaining steps: {total_steps - steps_taken}" - ) - else: - # Set up data the training dataloader - df, _, _, _ = df_utils.prep_or_copy_df(df) - train_loader = self._init_train_loader(df, num_workers) - dataset_size = len(df) # train_loader.dataset - self.model = self._init_model() self.model.train_loader = train_loader @@ -2823,11 +2794,9 @@ def _train( num_batches_per_epoch=len(self.model.train_loader), ) - # TODO: find out why scheduler not updated if continue_training: - self.trainer.lr_schedulers = [ - {"scheduler": self.config_train.scheduler, "interval": "step", "frequency": 1} - ] + print("setting up optimizers again") + # self.trainer.strategy.setup_optimizers(self.trainer) # Tune hyperparams and train if validation_enabled: diff --git a/neuralprophet/time_net.py b/neuralprophet/time_net.py index ea3c4b2f3..5b18525ac 100644 --- a/neuralprophet/time_net.py +++ b/neuralprophet/time_net.py @@ -63,6 +63,7 @@ def __init__( num_seasonalities_modelled: int = 1, num_seasonalities_modelled_dict: dict = None, meta_used_in_model: bool = False, + continue_training: bool = False, ): """ Parameters @@ -306,6 +307,9 @@ def __init__( else: self.config_regressors.regressors = None + # Continued training + self.continue_training = continue_training + @property def ar_weights(self) -> torch.Tensor: """sets property auto-regression weights for regularization. Update if AR is modelled differently""" @@ -863,12 +867,19 @@ def configure_optimizers(self): optimizer = self._optimizer(self.parameters(), lr=self.learning_rate, **self.config_train.optimizer_args) # Scheduler - lr_scheduler = self._scheduler( - optimizer, - max_lr=self.learning_rate, - total_steps=self.trainer.estimated_stepping_batches, - **self.config_train.scheduler_args, - ) + if self.continue_training: + # Update initial learning rate to the last learning rate for continued training + last_lr = optimizer.param_groups[0]["lr"] + lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95) + for param_group in optimizer.param_groups: + param_group["initial_lr"] = last_lr + else: + lr_scheduler = self._scheduler( + optimizer, + max_lr=self.learning_rate, + total_steps=self.trainer.estimated_stepping_batches, + **self.config_train.scheduler_args, + ) return {"optimizer": optimizer, "lr_scheduler": lr_scheduler} From da3a6d5d8a9de442e2f145b8dafbcde9b5507c71 Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Mon, 1 Jul 2024 17:22:02 +0200 Subject: [PATCH 04/15] add test --- neuralprophet/forecaster.py | 14 +++++++++++--- tests/test_utils.py | 27 +++++++++++++-------------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index cd7422fe2..17c256221 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -1051,6 +1051,10 @@ def fit( if self.fitted is True and not continue_training: log.error("Model has already been fitted. Re-fitting may break or produce different results.") + + if continue_training and self.metrics_logger.checkpoint_path is None: + log.error("Continued training requires checkpointing in model.") + self.max_lags = df_utils.get_max_num_lags(self.config_lagged_regressors, self.n_lags) if self.max_lags == 0 and self.n_forecasts > 1: self.n_forecasts = 1 @@ -2761,10 +2765,14 @@ def _train( # Set continue_training flag in model to update scheduler correctly self.model.continue_training = True + previous_epoch = checkpoint["epoch"] # Adjust epochs - additional_epochs = 50 - previous_epochs = self.config_train.epochs # Get the number of epochs already trained - new_total_epochs = previous_epochs + additional_epochs + if self.config_train.epochs: + additional_epochs = self.config_train.epochs + else: + additional_epochs = previous_epoch + # Get the number of epochs already trained + new_total_epochs = previous_epoch + additional_epochs self.config_train.epochs = new_total_epochs # Reinitialize optimizer with loaded model parameters diff --git a/tests/test_utils.py b/tests/test_utils.py index f08968e99..c5d838240 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -101,17 +101,16 @@ def test_save_load_io(): pd.testing.assert_frame_equal(forecast, forecast3) -# TODO: add functionality to continue training -# def test_continue_training(): -# df = pd.read_csv(PEYTON_FILE, nrows=NROWS) -# m = NeuralProphet( -# epochs=EPOCHS, -# batch_size=BATCH_SIZE, -# learning_rate=LR, -# n_lags=6, -# n_forecasts=3, -# n_changepoints=0, -# ) -# metrics = m.fit(df, freq="D") -# metrics2 = m.fit(df, freq="D", continue_training=True) -# assert metrics1["Loss"].sum() >= metrics2["Loss"].sum() +def test_continue_training(): + df = pd.read_csv(PEYTON_FILE, nrows=NROWS) + m = NeuralProphet( + epochs=EPOCHS, + batch_size=BATCH_SIZE, + learning_rate=LR, + n_lags=6, + n_forecasts=3, + n_changepoints=0, + ) + metrics = m.fit(df, checkpointing=True, freq="D") + metrics2 = m.fit(df, freq="D", continue_training=True) + assert metrics["Loss"].min() >= metrics2["Loss"].min() From f9969285c8c8b8f8f5cf22a6b8850b58323c576a Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Mon, 1 Jul 2024 21:04:07 +0200 Subject: [PATCH 05/15] fix metrics logging --- neuralprophet/forecaster.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index c972651e3..5ad4ad73f 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -2888,8 +2888,12 @@ def _train( if not metrics_enabled: return None + # Return metrics collected in logger as dataframe - metrics_df = pd.DataFrame(self.metrics_logger.history) + if self.metrics_logger.history is not None: + metrics_df = pd.DataFrame(self.metrics_logger.history) + else: + metrics_df = pd.DataFrame() return metrics_df def restore_trainer(self, accelerator: Optional[str] = None): From f9a77f8da770298d285f7f41f532d897a102316a Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Fri, 5 Jul 2024 10:19:56 +0200 Subject: [PATCH 06/15] include feedback --- neuralprophet/forecaster.py | 35 +++++++++-------------------------- 1 file changed, 9 insertions(+), 26 deletions(-) diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index 5ad4ad73f..7cf43d696 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -979,7 +979,12 @@ def fit( metrics with training and potentially evaluation metrics """ if self.fitted and not continue_training: - raise RuntimeError("Model has been fitted already. Please initialize a new model to fit again.") + raise RuntimeError( + "Model has been fitted already. If you want to continue training please set the flag continue_training." + ) + + if continue_training and epochs is None: + raise ValueError("Continued training requires setting the number of epochs to train for.") # Configuration if epochs is not None: @@ -1065,11 +1070,8 @@ def fit( or any(value != 1 for value in self.num_seasonalities_modelled_dict.values()) ) - if self.fitted is True and not continue_training: - log.error("Model has already been fitted. Re-fitting may break or produce different results.") - if continue_training and self.metrics_logger.checkpoint_path is None: - log.error("Continued training requires checkpointing in model.") + log.error("Continued training requires checkpointing in model to continue from last epoch.") self.max_lags = df_utils.get_max_num_lags( n_lags=self.n_lags, config_lagged_regressors=self.config_lagged_regressors @@ -2777,34 +2779,15 @@ def _train( # Load model and optimizer state from checkpoint if continue_training is True if continue_training: - checkpoint_path = self.metrics_logger.checkpoint_path - checkpoint = torch.load(checkpoint_path) - - # Load model state - self.model.load_state_dict(checkpoint["state_dict"]) + previous_epoch = self.model.current_epoch # Set continue_training flag in model to update scheduler correctly self.model.continue_training = True - previous_epoch = checkpoint["epoch"] # Adjust epochs - if self.config_train.epochs: - additional_epochs = self.config_train.epochs - else: - additional_epochs = previous_epoch - # Get the number of epochs already trained - new_total_epochs = previous_epoch + additional_epochs + new_total_epochs = previous_epoch + self.config_train.epochs self.config_train.epochs = new_total_epochs - # Reinitialize optimizer with loaded model parameters - optimizer = torch.optim.AdamW(self.model.parameters()) - - # Load optimizer state - if "optimizer_states" in checkpoint and checkpoint["optimizer_states"]: - optimizer.load_state_dict(checkpoint["optimizer_states"][0]) - - self.config_train.optimizer = optimizer - else: self.model = self._init_model() From 7ad761d00a93f3b4a9ad54d41116ed0b3c2cb91a Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Fri, 5 Jul 2024 14:56:27 +0200 Subject: [PATCH 07/15] get correct optimizer states --- neuralprophet/configure.py | 4 ++++ neuralprophet/forecaster.py | 6 ++++++ neuralprophet/time_net.py | 17 ++++++++++++++--- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/neuralprophet/configure.py b/neuralprophet/configure.py index 5b54b202e..7a8dcf7c7 100644 --- a/neuralprophet/configure.py +++ b/neuralprophet/configure.py @@ -104,6 +104,7 @@ class Train: n_data: int = field(init=False) loss_func_name: str = field(init=False) lr_finder_args: dict = field(default_factory=dict) + optimizer_state: dict = field(default_factory=dict) def __post_init__(self): # assert the uncertainty estimation params and then finalize the quantiles @@ -239,6 +240,9 @@ def get_reg_delay_weight(self, e, iter_progress, reg_start_pct: float = 0.66, re delay_weight = 1 return delay_weight + def set_optimizer_state(self, optimizer_state: dict): + self.optimizer_state = optimizer_state + @dataclass class Trend: diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index 7cf43d696..ed35413aa 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -2779,15 +2779,21 @@ def _train( # Load model and optimizer state from checkpoint if continue_training is True if continue_training: + checkpoint_path = self.metrics_logger.checkpoint_path + checkpoint = torch.load(checkpoint_path) + previous_epoch = self.model.current_epoch # Set continue_training flag in model to update scheduler correctly self.model.continue_training = True + self.model.start_epoch = previous_epoch # Adjust epochs new_total_epochs = previous_epoch + self.config_train.epochs self.config_train.epochs = new_total_epochs + self.config_train.set_optimizer_state(checkpoint["optimizer_states"][0]) + else: self.model = self._init_model() diff --git a/neuralprophet/time_net.py b/neuralprophet/time_net.py index 8413d8782..e635b9dda 100644 --- a/neuralprophet/time_net.py +++ b/neuralprophet/time_net.py @@ -64,6 +64,7 @@ def __init__( num_seasonalities_modelled_dict: dict = None, meta_used_in_model: bool = False, continue_training: bool = False, + start_epoch: int = 0, ): """ Parameters @@ -309,6 +310,7 @@ def __init__( # Continued training self.continue_training = continue_training + self.start_epoch = start_epoch @property def ar_weights(self) -> torch.Tensor: @@ -870,11 +872,20 @@ def configure_optimizers(self): # Scheduler if self.continue_training: + optimizer.load_state_dict(self.config_train.optimizer_state) + # Update initial learning rate to the last learning rate for continued training - last_lr = optimizer.param_groups[0]["lr"] - lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95) + last_lr = float(optimizer.param_groups[0]["lr"]) # Ensure it's a float + + batches_per_epoch = len(self.train_dataloader()) + total_batches_processed = self.start_epoch * batches_per_epoch + for param_group in optimizer.param_groups: - param_group["initial_lr"] = last_lr + param_group["initial_lr"] = (last_lr,) + + lr_scheduler = lr_scheduler = torch.optim.lr_scheduler.ExponentialLR( + optimizer, gamma=0.95, last_epoch=total_batches_processed - 1 + ) else: lr_scheduler = self._scheduler( optimizer, From b14d20b2fe9d357d10719b61081eda39132a0345 Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Fri, 5 Jul 2024 15:01:07 +0200 Subject: [PATCH 08/15] fix tests --- tests/test_utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_utils.py b/tests/test_utils.py index de57fd5fd..a1f8c5874 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -21,6 +21,7 @@ YOS_FILE = os.path.join(DATA_DIR, "yosemite_temps.csv") NROWS = 512 EPOCHS = 10 +ADDITIONAL_EPOCHS = 5 LR = 1.0 BATCH_SIZE = 64 @@ -112,5 +113,5 @@ def test_continue_training(): n_changepoints=0, ) metrics = m.fit(df, checkpointing=True, freq="D") - metrics2 = m.fit(df, freq="D", continue_training=True) + metrics2 = m.fit(df, freq="D", continue_training=True, epochs=ADDITIONAL_EPOCHS) assert metrics["Loss"].min() >= metrics2["Loss"].min() From 9fe34012e214a9634c55387822bf26efb269b1a1 Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Mon, 8 Jul 2024 11:52:44 +0200 Subject: [PATCH 09/15] enable setting the scheduler --- neuralprophet/configure.py | 65 ++++++++++++++++++++++++++++++------- neuralprophet/forecaster.py | 16 ++++++++- neuralprophet/time_net.py | 19 +++++++---- 3 files changed, 82 insertions(+), 18 deletions(-) diff --git a/neuralprophet/configure.py b/neuralprophet/configure.py index 7a8dcf7c7..79be7fc5a 100644 --- a/neuralprophet/configure.py +++ b/neuralprophet/configure.py @@ -94,7 +94,7 @@ class Train: optimizer: Union[str, Type[torch.optim.Optimizer]] quantiles: List[float] = field(default_factory=list) optimizer_args: dict = field(default_factory=dict) - scheduler: Optional[Type[torch.optim.lr_scheduler.OneCycleLR]] = None + scheduler: Optional[Type[torch.optim.lr_scheduler._LRScheduler]] = None scheduler_args: dict = field(default_factory=dict) newer_samples_weight: float = 1.0 newer_samples_start: float = 0.0 @@ -193,16 +193,59 @@ def set_scheduler(self): Set the scheduler and scheduler args. The scheduler is not initialized yet as this is done in configure_optimizers in TimeNet. """ - self.scheduler = torch.optim.lr_scheduler.OneCycleLR - self.scheduler_args.update( - { - "pct_start": 0.3, - "anneal_strategy": "cos", - "div_factor": 10.0, - "final_div_factor": 10.0, - "three_phase": True, - } - ) + self.scheduler_args.clear() + if isinstance(self.scheduler, str): + if self.scheduler.lower() == "onecyclelr": + self.scheduler = torch.optim.lr_scheduler.OneCycleLR + self.scheduler_args.update( + { + "pct_start": 0.3, + "anneal_strategy": "cos", + "div_factor": 10.0, + "final_div_factor": 10.0, + "three_phase": True, + } + ) + elif self.scheduler.lower() == "steplr": + self.scheduler = torch.optim.lr_scheduler.StepLR + self.scheduler_args.update( + { + "step_size": 10, + "gamma": 0.1, + } + ) + elif self.scheduler.lower() == "exponentiallr": + self.scheduler = torch.optim.lr_scheduler.ExponentialLR + self.scheduler_args.update( + { + "gamma": 0.95, + } + ) + elif self.scheduler.lower() == "reducelronplateau": + self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau + self.scheduler_args.update( + { + "mode": "min", + "factor": 0.1, + "patience": 10, + } + ) + elif self.scheduler.lower() == "cosineannealinglr": + self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR + self.scheduler_args.update( + { + "T_max": 50, + } + ) + else: + raise NotImplementedError(f"Scheduler {self.scheduler} is not supported.") + elif self.scheduler is None: + self.scheduler = torch.optim.lr_scheduler.ExponentialLR + self.scheduler_args.update( + { + "gamma": 0.95, + } + ) def set_lr_finder_args(self, dataset_size, num_batches): """ diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index ed35413aa..2e12d974e 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -451,6 +451,7 @@ def __init__( accelerator: Optional[str] = None, trainer_config: dict = {}, prediction_frequency: Optional[dict] = None, + scheduler: Optional[str] = "onecyclelr", ): self.config = locals() self.config.pop("self") @@ -509,6 +510,7 @@ def __init__( self.config_train = configure.Train( quantiles=quantiles, learning_rate=learning_rate, + scheduler=scheduler, epochs=epochs, batch_size=batch_size, loss_func=loss_func, @@ -921,6 +923,7 @@ def fit( continue_training: bool = False, num_workers: int = 0, deterministic: bool = False, + scheduler: Optional[str] = None, ): """Train, and potentially evaluate model. @@ -986,6 +989,18 @@ def fit( if continue_training and epochs is None: raise ValueError("Continued training requires setting the number of epochs to train for.") + if continue_training: + if scheduler is not None: + self.config_train.scheduler = scheduler + else: + self.config_train.scheduler = None + self.config_train.set_scheduler() + + if scheduler is not None and not continue_training: + log.warning( + "Scheduler can only be set in fit when continuing training. Please set the scheduler when initializing the model." + ) + # Configuration if epochs is not None: self.config_train.epochs = epochs @@ -2681,7 +2696,6 @@ def _init_train_loader(self, df, num_workers=0): config_seasonality=self.config_seasonality, ) - print("Changepoints:", self.config_trend.changepoints) df = _normalize(df=df, config_normalization=self.config_normalization) if not self.fitted: if self.config_trend.changepoints is not None: diff --git a/neuralprophet/time_net.py b/neuralprophet/time_net.py index e635b9dda..3bbfbe66b 100644 --- a/neuralprophet/time_net.py +++ b/neuralprophet/time_net.py @@ -883,16 +883,23 @@ def configure_optimizers(self): for param_group in optimizer.param_groups: param_group["initial_lr"] = (last_lr,) - lr_scheduler = lr_scheduler = torch.optim.lr_scheduler.ExponentialLR( - optimizer, gamma=0.95, last_epoch=total_batches_processed - 1 - ) - else: lr_scheduler = self._scheduler( optimizer, - max_lr=self.learning_rate, - total_steps=self.trainer.estimated_stepping_batches, **self.config_train.scheduler_args, ) + else: + if self._scheduler == torch.optim.lr_scheduler.OneCycleLR: + lr_scheduler = self._scheduler( + optimizer, + max_lr=self.learning_rate, + total_steps=self.trainer.estimated_stepping_batches, + **self.config_train.scheduler_args, + ) + else: + lr_scheduler = self._scheduler( + optimizer, + **self.config_train.scheduler_args, + ) return {"optimizer": optimizer, "lr_scheduler": lr_scheduler} From 00f2e25e13aca00fb58d8951214fb27ad0da792a Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Mon, 8 Jul 2024 12:31:51 +0200 Subject: [PATCH 10/15] update for onecyclelr --- neuralprophet/forecaster.py | 2 +- neuralprophet/time_net.py | 29 ++++++++++++++--------------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index 2e12d974e..0a1202c6c 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -436,6 +436,7 @@ def __init__( batch_size: Optional[int] = None, loss_func: Union[str, torch.nn.modules.loss._Loss, Callable] = "SmoothL1Loss", optimizer: Union[str, Type[torch.optim.Optimizer]] = "AdamW", + scheduler: Optional[str] = "onecyclelr", newer_samples_weight: float = 2, newer_samples_start: float = 0.0, quantiles: List[float] = [], @@ -451,7 +452,6 @@ def __init__( accelerator: Optional[str] = None, trainer_config: dict = {}, prediction_frequency: Optional[dict] = None, - scheduler: Optional[str] = "onecyclelr", ): self.config = locals() self.config.pop("self") diff --git a/neuralprophet/time_net.py b/neuralprophet/time_net.py index 3bbfbe66b..28f4058b5 100644 --- a/neuralprophet/time_net.py +++ b/neuralprophet/time_net.py @@ -871,35 +871,34 @@ def configure_optimizers(self): optimizer = self._optimizer(self.parameters(), lr=self.learning_rate, **self.config_train.optimizer_args) # Scheduler + self._scheduler = self.config_train.scheduler + if self.continue_training: optimizer.load_state_dict(self.config_train.optimizer_state) # Update initial learning rate to the last learning rate for continued training last_lr = float(optimizer.param_groups[0]["lr"]) # Ensure it's a float - batches_per_epoch = len(self.train_dataloader()) - total_batches_processed = self.start_epoch * batches_per_epoch - for param_group in optimizer.param_groups: param_group["initial_lr"] = (last_lr,) + if self._scheduler == torch.optim.lr_scheduler.OneCycleLR: + log.warning("OneCycleLR scheduler is not supported for continued training. Switching to ExponentialLR") + self._scheduler = torch.optim.lr_scheduler.ExponentialLR + self.config_train.scheduler_args = {"gamma": 0.95} + + if self._scheduler == torch.optim.lr_scheduler.OneCycleLR: lr_scheduler = self._scheduler( optimizer, + max_lr=self.learning_rate, + total_steps=self.trainer.estimated_stepping_batches, **self.config_train.scheduler_args, ) else: - if self._scheduler == torch.optim.lr_scheduler.OneCycleLR: - lr_scheduler = self._scheduler( - optimizer, - max_lr=self.learning_rate, - total_steps=self.trainer.estimated_stepping_batches, - **self.config_train.scheduler_args, - ) - else: - lr_scheduler = self._scheduler( - optimizer, - **self.config_train.scheduler_args, - ) + lr_scheduler = self._scheduler( + optimizer, + **self.config_train.scheduler_args, + ) return {"optimizer": optimizer, "lr_scheduler": lr_scheduler} From 5f103d8837fc53fd5639efd260900c8e171341ee Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Tue, 9 Jul 2024 14:01:36 +0200 Subject: [PATCH 11/15] add tests and adapt docstring --- neuralprophet/configure.py | 11 +---------- neuralprophet/forecaster.py | 24 +++++++++++++++++++++++- tests/test_utils.py | 31 +++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 11 deletions(-) diff --git a/neuralprophet/configure.py b/neuralprophet/configure.py index 79be7fc5a..5cc5edca7 100644 --- a/neuralprophet/configure.py +++ b/neuralprophet/configure.py @@ -190,7 +190,7 @@ def set_optimizer(self): def set_scheduler(self): """ - Set the scheduler and scheduler args. + Set the scheduler and scheduler arg depending on the user selection. The scheduler is not initialized yet as this is done in configure_optimizers in TimeNet. """ self.scheduler_args.clear() @@ -221,15 +221,6 @@ def set_scheduler(self): "gamma": 0.95, } ) - elif self.scheduler.lower() == "reducelronplateau": - self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau - self.scheduler_args.update( - { - "mode": "min", - "factor": 0.1, - "patience": 10, - } - ) elif self.scheduler.lower() == "cosineannealinglr": self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR self.scheduler_args.update( diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index 0a1202c6c..05cdc3f75 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -301,6 +301,20 @@ class NeuralProphet: >>> m = NeuralProphet(collect_metrics=["MSE", "MAE", "RMSE"]) >>> # use custorm torchmetrics names >>> m = NeuralProphet(collect_metrics={"MAPE": "MeanAbsolutePercentageError", "MSLE": "MeanSquaredLogError", + scheduler : str, torch.optim.lr_scheduler._LRScheduler + Type of learning rate scheduler to use. + + Options + * (default) ``OneCycleLR``: One Cycle Learning Rate scheduler + * ``StepLR``: Step Learning Rate scheduler + * ``ExponentialLR``: Exponential Learning Rate scheduler + * ``CosineAnnealingLR``: Cosine Annealing Learning Rate scheduler + + Examples + -------- + >>> from neuralprophet import NeuralProphet + >>> # Step Learning Rate scheduler + >>> m = NeuralProphet(scheduler="StepLR") COMMENT Uncertainty Estimation @@ -975,6 +989,13 @@ def fit( Note: using multiple workers and therefore distributed training might significantly increase the training time since each batch needs to be copied to each worker for each epoch. Keeping all data on the main process might be faster for most datasets. + scheduler : str + Type of learning rate scheduler to use for continued training. If None, uses ExponentialLR as + default as specified in the model config. + Options + * ``StepLR``: Step Learning Rate scheduler + * ``ExponentialLR``: Exponential Learning Rate scheduler + * ``CosineAnnealingLR``: Cosine Annealing Learning Rate scheduler Returns ------- @@ -2796,7 +2817,8 @@ def _train( checkpoint_path = self.metrics_logger.checkpoint_path checkpoint = torch.load(checkpoint_path) - previous_epoch = self.model.current_epoch + checkpoint_epoch = checkpoint["epoch"] if "epoch" in checkpoint else 0 + previous_epoch = max(self.model.current_epoch, checkpoint_epoch) # Set continue_training flag in model to update scheduler correctly self.model.continue_training = True diff --git a/tests/test_utils.py b/tests/test_utils.py index a1f8c5874..3b93721bf 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -115,3 +115,34 @@ def test_continue_training(): metrics = m.fit(df, checkpointing=True, freq="D") metrics2 = m.fit(df, freq="D", continue_training=True, epochs=ADDITIONAL_EPOCHS) assert metrics["Loss"].min() >= metrics2["Loss"].min() + + +def test_continue_training_with_scheduler_selection(): + df = pd.read_csv(PEYTON_FILE, nrows=NROWS) + m = NeuralProphet( + epochs=EPOCHS, + batch_size=BATCH_SIZE, + learning_rate=LR, + n_lags=6, + n_forecasts=3, + n_changepoints=0, + ) + metrics = m.fit(df, checkpointing=True, freq="D") + # Continue training with StepLR + metrics2 = m.fit(df, freq="D", continue_training=True, epochs=ADDITIONAL_EPOCHS, scheduler="StepLR") + assert metrics["Loss"].min() >= metrics2["Loss"].min() + + +def test_save_load_continue_training(): + df = pd.read_csv(PEYTON_FILE, nrows=NROWS) + m = NeuralProphet( + epochs=EPOCHS, + n_lags=6, + n_forecasts=3, + n_changepoints=0, + ) + metrics = m.fit(df, checkpointing=True, freq="D") + save(m, "test_model.pt") + m2 = load("test_model.pt") + metrics2 = m2.fit(df, continue_training=True, epochs=ADDITIONAL_EPOCHS, scheduler="StepLR") + assert metrics["Loss"].min() >= metrics2["Loss"].min() From e04320157766efd517ac3149d62a41d62373b67c Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Tue, 9 Jul 2024 14:22:17 +0200 Subject: [PATCH 12/15] fix array mismatch --- neuralprophet/forecaster.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index 05cdc3f75..1c99def2d 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -2916,7 +2916,13 @@ def _train( # Return metrics collected in logger as dataframe if self.metrics_logger.history is not None: - metrics_df = pd.DataFrame(self.metrics_logger.history) + # avoid array mismatch when continuing training + history = self.metrics_logger.history + max_length = max(len(lst) for lst in history.values()) + for key in history: + while len(history[key]) < max_length: + history[key].append(None) + metrics_df = pd.DataFrame(history) else: metrics_df = pd.DataFrame() return metrics_df From 63c935c8e4789cb4f3ce948de0b86c23c8d0ddbc Mon Sep 17 00:00:00 2001 From: ourownstory Date: Fri, 23 Aug 2024 17:18:51 -0700 Subject: [PATCH 13/15] robustify scheduler config --- neuralprophet/configure.py | 64 ++++++++++++++++++------------------- neuralprophet/forecaster.py | 44 +++++++++++++++---------- 2 files changed, 59 insertions(+), 49 deletions(-) diff --git a/neuralprophet/configure.py b/neuralprophet/configure.py index d44d6af81..ee8a442d7 100644 --- a/neuralprophet/configure.py +++ b/neuralprophet/configure.py @@ -94,7 +94,7 @@ class Train: optimizer: Union[str, Type[torch.optim.Optimizer]] quantiles: List[float] = field(default_factory=list) optimizer_args: dict = field(default_factory=dict) - scheduler: Optional[Type[torch.optim.lr_scheduler._LRScheduler]] = None + scheduler: Optional[Union[str, Type[torch.optim.lr_scheduler.LRScheduler]]] = None scheduler_args: dict = field(default_factory=dict) newer_samples_weight: float = 1.0 newer_samples_start: float = 0.0 @@ -193,50 +193,48 @@ def set_scheduler(self): Set the scheduler and scheduler arg depending on the user selection. The scheduler is not initialized yet as this is done in configure_optimizers in TimeNet. """ - self.scheduler_args.clear() if isinstance(self.scheduler, str): if self.scheduler.lower() == "onecyclelr": self.scheduler = torch.optim.lr_scheduler.OneCycleLR - self.scheduler_args.update( - { - "pct_start": 0.3, - "anneal_strategy": "cos", - "div_factor": 10.0, - "final_div_factor": 10.0, - "three_phase": True, - } - ) + defaults = { + "pct_start": 0.3, + "anneal_strategy": "cos", + "div_factor": 10.0, + "final_div_factor": 10.0, + "three_phase": True, + } elif self.scheduler.lower() == "steplr": self.scheduler = torch.optim.lr_scheduler.StepLR - self.scheduler_args.update( - { - "step_size": 10, - "gamma": 0.1, - } - ) + defaults = { + "step_size": 10, + "gamma": 0.1, + } elif self.scheduler.lower() == "exponentiallr": self.scheduler = torch.optim.lr_scheduler.ExponentialLR - self.scheduler_args.update( - { - "gamma": 0.95, - } - ) + defaults = { + "gamma": 0.95, + } elif self.scheduler.lower() == "cosineannealinglr": self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR - self.scheduler_args.update( - { - "T_max": 50, - } - ) + defaults = { + "T_max": 50, + } else: - raise NotImplementedError(f"Scheduler {self.scheduler} is not supported.") + raise NotImplementedError( + f"Scheduler {self.scheduler} is not supported from string. Please pass the scheduler class." + ) + if self.scheduler_args is not None: + defaults.update(self.scheduler_args) + self.scheduler_args = defaults elif self.scheduler is None: self.scheduler = torch.optim.lr_scheduler.ExponentialLR - self.scheduler_args.update( - { - "gamma": 0.95, - } - ) + self.scheduler_args = { + "gamma": 0.95, + } + else: # if scheduler is a class + assert issubclass( + self.scheduler, torch.optim.lr_scheduler.LRScheduler + ), "Scheduler must be a subclass of torch.optim.lr_scheduler.LRScheduler" def set_lr_finder_args(self, dataset_size, num_batches): """ diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index 3cc386dc2..6caf1cc15 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -298,6 +298,7 @@ class NeuralProphet: >>> m = NeuralProphet(collect_metrics=["MSE", "MAE", "RMSE"]) >>> # use custorm torchmetrics names >>> m = NeuralProphet(collect_metrics={"MAPE": "MeanAbsolutePercentageError", "MSLE": "MeanSquaredLogError", + scheduler : str, torch.optim.lr_scheduler._LRScheduler Type of learning rate scheduler to use. @@ -446,7 +447,8 @@ def __init__( batch_size: Optional[int] = None, loss_func: Union[str, torch.nn.modules.loss._Loss, Callable] = "SmoothL1Loss", optimizer: Union[str, Type[torch.optim.Optimizer]] = "AdamW", - scheduler: Optional[str] = "onecyclelr", + scheduler: Optional[Union[str, Type[torch.optim.lr_scheduler.LRScheduler]]] = "onecyclelr", + scheduler_args: Optional[dict] = None, newer_samples_weight: float = 2, newer_samples_start: float = 0.0, quantiles: List[float] = [], @@ -521,6 +523,7 @@ def __init__( quantiles=quantiles, learning_rate=learning_rate, scheduler=scheduler, + scheduler_args=scheduler_args, epochs=epochs, batch_size=batch_size, loss_func=loss_func, @@ -932,7 +935,8 @@ def fit( continue_training: bool = False, num_workers: int = 0, deterministic: bool = False, - scheduler: Optional[str] = None, + scheduler: Optional[Union[str, Type[torch.optim.lr_scheduler.LRScheduler]]] = None, + scheduler_args: Optional[dict] = None, ): """Train, and potentially evaluate model. @@ -1002,20 +1006,30 @@ def fit( "Model has been fitted already. If you want to continue training please set the flag continue_training." ) - if continue_training and epochs is None: - raise ValueError("Continued training requires setting the number of epochs to train for.") - if continue_training: - if scheduler is not None: - self.config_train.scheduler = scheduler - else: + if epochs is None: + raise ValueError("Continued training requires setting the number of epochs to train for.") + + if continue_training and self.metrics_logger.checkpoint_path is None: + log.error("Continued training requires checkpointing in model to continue from last epoch.") + + # if scheduler is not None: + # log.warning( + # "Scheduler can only be set in fit when continuing training. Please set the scheduler when initializing the model." + # ) + + if scheduler is None: + log.warning( + "No scheduler specified for continued training. Using a fallback scheduler for continued training." + ) self.config_train.scheduler = None - self.config_train.set_scheduler() + self.config_train.scheduler_args = None + self.config_train.set_scheduler() - if scheduler is not None and not continue_training: - log.warning( - "Scheduler can only be set in fit when continuing training. Please set the scheduler when initializing the model." - ) + if scheduler is not None: + self.config_train.scheduler = scheduler + self.config_train.scheduler_args = scheduler_args + self.config_train.set_scheduler() # Configuration if epochs is not None: @@ -1061,6 +1075,7 @@ def fit( log.info("When Global modeling with local normalization, metrics are displayed in normalized scale.") if minimal: + # overrides these settings: checkpointing = False self.metrics = False progress = None @@ -1101,9 +1116,6 @@ def fit( or any(value != 1 for value in self.num_seasonalities_modelled_dict.values()) ) - if continue_training and self.metrics_logger.checkpoint_path is None: - log.error("Continued training requires checkpointing in model to continue from last epoch.") - self.max_lags = df_utils.get_max_num_lags( n_lags=self.n_lags, config_lagged_regressors=self.config_lagged_regressors ) From 6a746804c84a565ee05d7d1b9986248c0d4cef70 Mon Sep 17 00:00:00 2001 From: ourownstory Date: Fri, 23 Aug 2024 18:49:18 -0700 Subject: [PATCH 14/15] clean up train config setup --- neuralprophet/configure.py | 88 ++++++++++++++++++++--------- neuralprophet/forecaster.py | 109 +++++++++++++++++++++--------------- neuralprophet/time_net.py | 29 +++++----- 3 files changed, 138 insertions(+), 88 deletions(-) diff --git a/neuralprophet/configure.py b/neuralprophet/configure.py index ee8a442d7..5d5144fc7 100644 --- a/neuralprophet/configure.py +++ b/neuralprophet/configure.py @@ -105,16 +105,19 @@ class Train: loss_func_name: str = field(init=False) lr_finder_args: dict = field(default_factory=dict) optimizer_state: dict = field(default_factory=dict) + continue_training: bool = False def __post_init__(self): # assert the uncertainty estimation params and then finalize the quantiles - self.set_quantiles() + # self.set_quantiles() assert self.newer_samples_weight >= 1.0 assert self.newer_samples_start >= 0.0 assert self.newer_samples_start < 1.0 self.set_loss_func() - self.set_optimizer() - self.set_scheduler() + + # called in TimeNet configure_optimizers: + # self.set_optimizer() + # self.set_scheduler() def set_loss_func(self): if isinstance(self.loss_func, str): @@ -139,22 +142,22 @@ def set_loss_func(self): if len(self.quantiles) > 1: self.loss_func = PinballLoss(loss_func=self.loss_func, quantiles=self.quantiles) - def set_quantiles(self): - # convert quantiles to empty list [] if None - if self.quantiles is None: - self.quantiles = [] - # assert quantiles is a list type - assert isinstance(self.quantiles, list), "Quantiles must be in a list format, not None or scalar." - # check if quantiles contain 0.5 or close to 0.5, remove if so as 0.5 will be inserted again as first index - self.quantiles = [quantile for quantile in self.quantiles if not math.isclose(0.5, quantile)] - # check if quantiles are float values in (0, 1) - assert all( - 0 < quantile < 1 for quantile in self.quantiles - ), "The quantiles specified need to be floats in-between (0, 1)." - # sort the quantiles - self.quantiles.sort() - # 0 is the median quantile index - self.quantiles.insert(0, 0.5) + # def set_quantiles(self): + # # convert quantiles to empty list [] if None + # if self.quantiles is None: + # self.quantiles = [] + # # assert quantiles is a list type + # assert isinstance(self.quantiles, list), "Quantiles must be in a list format, not None or scalar." + # # check if quantiles contain 0.5 or close to 0.5, remove if so as 0.5 will be inserted again as first index + # self.quantiles = [quantile for quantile in self.quantiles if not math.isclose(0.5, quantile)] + # # check if quantiles are float values in (0, 1) + # assert all( + # 0 < quantile < 1 for quantile in self.quantiles + # ), "The quantiles specified need to be floats in-between (0, 1)." + # # sort the quantiles + # self.quantiles.sort() + # # 0 is the median quantile index + # self.quantiles.insert(0, 0.5) def set_auto_batch_epoch( self, @@ -183,16 +186,50 @@ def set_optimizer(self): """ Set the optimizer and optimizer args. If optimizer is a string, then it will be converted to the corresponding torch optimizer. The optimizer is not initialized yet as this is done in configure_optimizers in TimeNet. + + Parameters + ---------- + optimizer_name : int + Object provided to NeuralProphet as optimizer. + optimizer_args : dict + Arguments for the optimizer. + """ - self.optimizer, self.optimizer_args = utils_torch.create_optimizer_from_config( - self.optimizer, self.optimizer_args - ) + if isinstance(self.optimizer, str): + if self.optimizer.lower() == "adamw": + # Tends to overfit, but reliable + self.optimizer = torch.optim.AdamW + self.optimizer_args["weight_decay"] = 1e-3 + elif self.optimizer.lower() == "sgd": + # better validation performance, but diverges sometimes + self.optimizer = torch.optim.SGD + self.optimizer_args["momentum"] = 0.9 + self.optimizer_args["weight_decay"] = 1e-4 + else: + raise ValueError( + f"The optimizer name {self.optimizer} is not supported. Please pass the optimizer class." + ) + elif not issubclass(self.optimizer, torch.optim.Optimizer): + raise ValueError("The provided optimizer is not supported.") def set_scheduler(self): """ Set the scheduler and scheduler arg depending on the user selection. The scheduler is not initialized yet as this is done in configure_optimizers in TimeNet. """ + if self.continue_training: + if (isinstance(self.scheduler, str) and self.scheduler.lower() == "onecyclelr") or isinstance( + self.scheduler, torch.optim.lr_scheduler.OneCycleLR + ): + log.warning( + "OneCycleLR scheduler is not supported for continued training. Please set another scheduler. Falling back to ExponentialLR scheduler" + ) + self.scheduler = "exponentiallr" + + if self.scheduler is None: + log.warning("No scheduler specified. Falling back to ExponentialLR scheduler.") + self.scheduler = "exponentiallr" + if isinstance(self.scheduler, str): if self.scheduler.lower() == "onecyclelr": self.scheduler = torch.optim.lr_scheduler.OneCycleLR @@ -226,12 +263,7 @@ def set_scheduler(self): if self.scheduler_args is not None: defaults.update(self.scheduler_args) self.scheduler_args = defaults - elif self.scheduler is None: - self.scheduler = torch.optim.lr_scheduler.ExponentialLR - self.scheduler_args = { - "gamma": 0.95, - } - else: # if scheduler is a class + else: assert issubclass( self.scheduler, torch.optim.lr_scheduler.LRScheduler ), "Scheduler must be a subclass of torch.optim.lr_scheduler.LRScheduler" diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index 6caf1cc15..ea95834f4 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -1,4 +1,5 @@ import logging +import math import os import time from collections import OrderedDict @@ -518,20 +519,36 @@ def __init__( trend_local_reg=trend_local_reg, ) + # Model + self.quantiles = quantiles + # convert quantiles to empty list [] if None + if self.quantiles is None: + self.quantiles = [] + # assert quantiles is a list type + assert isinstance(self.quantiles, list), "Quantiles must be in a list format, not None or scalar." + # check if quantiles contain 0.5 or close to 0.5, remove if so as 0.5 will be inserted again as first index + self.quantiles = [quantile for quantile in self.quantiles if not math.isclose(0.5, quantile)] + # check if quantiles are float values in (0, 1) + assert all( + 0 < quantile < 1 for quantile in self.quantiles + ), "The quantiles specified need to be floats in-between (0, 1)." + # sort the quantiles + self.quantiles.sort() + # 0 is the median quantile index + self.quantiles.insert(0, 0.5) + # Training - self.config_train = configure.Train( - quantiles=quantiles, - learning_rate=learning_rate, - scheduler=scheduler, - scheduler_args=scheduler_args, - epochs=epochs, - batch_size=batch_size, - loss_func=loss_func, - optimizer=optimizer, - newer_samples_weight=newer_samples_weight, - newer_samples_start=newer_samples_start, - trend_reg_threshold=self.config_trend.trend_reg_threshold, - ) + self.learning_rate = learning_rate + self.scheduler = scheduler + self.scheduler_args = scheduler_args + self.epochs = epochs + self.batch_size = batch_size + self.loss_func = loss_func + self.optimizer = optimizer + self.newer_samples_weight = newer_samples_weight + self.newer_samples_start = newer_samples_start + self.trend_reg_threshold = self.config_trend.trend_reg_threshold + self.continue_training = False # Seasonality self.config_seasonality = configure.ConfigSeasonality( @@ -1013,25 +1030,29 @@ def fit( if continue_training and self.metrics_logger.checkpoint_path is None: log.error("Continued training requires checkpointing in model to continue from last epoch.") - # if scheduler is not None: - # log.warning( - # "Scheduler can only be set in fit when continuing training. Please set the scheduler when initializing the model." - # ) + # Configuration + self.continue_training = continue_training - if scheduler is None: - log.warning( - "No scheduler specified for continued training. Using a fallback scheduler for continued training." - ) - self.config_train.scheduler = None - self.config_train.scheduler_args = None - self.config_train.set_scheduler() + # Config + self.config_train = configure.Train( + quantiles=self.quantiles, + learning_rate=self.learning_rate, + scheduler=self.scheduler, + scheduler_args=self.scheduler_args, + epochs=self.epochs, + batch_size=self.batch_size, + loss_func=self.loss_func, + optimizer=self.optimizer, + newer_samples_weight=self.newer_samples_weight, + newer_samples_start=self.newer_samples_start, + trend_reg_threshold=self.config_trend.trend_reg_threshold, + continue_training=self.continue_training, + ) if scheduler is not None: self.config_train.scheduler = scheduler self.config_train.scheduler_args = scheduler_args - self.config_train.set_scheduler() - # Configuration if epochs is not None: self.config_train.epochs = epochs @@ -1245,7 +1266,7 @@ def predict(self, df: pd.DataFrame, decompose: bool = True, raw: bool = False, a dates=dates, predicted=predicted, n_forecasts=self.n_forecasts, - quantiles=self.config_train.quantiles, + quantiles=self.quantiles, components=components, ) if auto_extend and periods_added[df_name] > 0: @@ -1260,7 +1281,7 @@ def predict(self, df: pd.DataFrame, decompose: bool = True, raw: bool = False, a n_forecasts=self.n_forecasts, max_lags=self.max_lags, freq=self.data_freq, - quantiles=self.config_train.quantiles, + quantiles=self.quantiles, config_lagged_regressors=self.config_lagged_regressors, ) if auto_extend and periods_added[df_name] > 0: @@ -1901,7 +1922,7 @@ def predict_trend(self, df: pd.DataFrame, quantile: float = 0.5): else: meta_name_tensor = None - quantile_index = self.config_train.quantiles.index(quantile) + quantile_index = self.quantiles.index(quantile) trend = self.model.trend(t, meta_name_tensor).detach().numpy()[:, :, quantile_index].squeeze() data_params = self.config_normalization.get_data_params(df_name) @@ -1966,7 +1987,7 @@ def predict_seasonal_components(self, df: pd.DataFrame, quantile: float = 0.5): for name in self.config_seasonality.periods: features = inputs["seasonalities"][name] - quantile_index = self.config_train.quantiles.index(quantile) + quantile_index = self.quantiles.index(quantile) y_season = torch.squeeze( self.model.seasonality.compute_fourier(features=features, name=name, meta=meta_name_tensor)[ :, :, quantile_index @@ -2098,7 +2119,7 @@ def plot( log.info(f"Plotting data from ID {df_name}") if forecast_in_focus is None: forecast_in_focus = self.highlight_forecast_step_n - if len(self.config_train.quantiles) > 1: + if len(self.quantiles) > 1: if (self.highlight_forecast_step_n) is None and ( self.n_forecasts > 1 or self.n_lags > 0 ): # rather query if n_forecasts >1 than n_lags>1 @@ -2138,7 +2159,7 @@ def plot( if plotting_backend.startswith("plotly"): return plot_plotly( fcst=fcst, - quantiles=self.config_train.quantiles, + quantiles=self.quantiles, xlabel=xlabel, ylabel=ylabel, figsize=tuple(x * 70 for x in figsize), @@ -2149,7 +2170,7 @@ def plot( else: return plot( fcst=fcst, - quantiles=self.config_train.quantiles, + quantiles=self.quantiles, ax=ax, xlabel=xlabel, ylabel=ylabel, @@ -2217,9 +2238,7 @@ def get_latest_forecast( fcst = fcst[-(include_previous_forecasts + self.n_forecasts) :] elif include_history_data is True: fcst = fcst - fcst = utils.fcst_df_to_latest_forecast( - fcst, self.config_train.quantiles, n_last=1 + include_previous_forecasts - ) + fcst = utils.fcst_df_to_latest_forecast(fcst, self.quantiles, n_last=1 + include_previous_forecasts) return fcst def plot_latest_forecast( @@ -2287,7 +2306,7 @@ def plot_latest_forecast( else: fcst = fcst[fcst["ID"] == df_name].copy(deep=True) log.info(f"Plotting data from ID {df_name}") - if len(self.config_train.quantiles) > 1: + if len(self.quantiles) > 1: log.warning( "Plotting latest forecasts when uncertainty estimation enabled" " plots only the median quantile forecasts." @@ -2298,9 +2317,7 @@ def plot_latest_forecast( fcst = fcst[-(include_previous_forecasts + self.n_forecasts) :] elif plot_history_data is True: fcst = fcst - fcst = utils.fcst_df_to_latest_forecast( - fcst, self.config_train.quantiles, n_last=1 + include_previous_forecasts - ) + fcst = utils.fcst_df_to_latest_forecast(fcst, self.quantiles, n_last=1 + include_previous_forecasts) # Check whether a local or global plotting backend is set. plotting_backend = select_plotting_backend(model=self, plotting_backend=plotting_backend) @@ -2309,7 +2326,7 @@ def plot_latest_forecast( if plotting_backend.startswith("plotly"): return plot_plotly( fcst=fcst, - quantiles=self.config_train.quantiles, + quantiles=self.quantiles, ylabel=ylabel, xlabel=xlabel, figsize=tuple(x * 70 for x in figsize), @@ -2321,7 +2338,7 @@ def plot_latest_forecast( else: return plot( fcst=fcst, - quantiles=self.config_train.quantiles, + quantiles=self.quantiles, ax=ax, ylabel=ylabel, xlabel=xlabel, @@ -2487,7 +2504,7 @@ def plot_components( m=self, fcst=fcst, plot_configuration=valid_plot_configuration, - quantile=self.config_train.quantiles[0], # plot components only for median quantile + quantile=self.quantiles[0], # plot components only for median quantile figsize=figsize, df_name=df_name, one_period_per_season=one_period_per_season, @@ -2597,11 +2614,11 @@ def plot_parameters( if not (0 < quantile < 1): raise ValueError("The quantile selected needs to be a float in-between (0,1)") # ValueError if selected quantile is out of range - if quantile not in self.config_train.quantiles: + if quantile not in self.quantiles: raise ValueError("Selected quantile is not specified in the model configuration.") else: # plot parameters for median quantile if not specified - quantile = self.config_train.quantiles[0] + quantile = self.quantiles[0] # Validate components to be plotted valid_parameters_set = [ @@ -3148,7 +3165,7 @@ def conformal_predict( alpha=alpha, method=method, n_forecasts=self.n_forecasts, - quantiles=self.config_train.quantiles, + quantiles=self.quantiles, ) df_forecast = c.predict(df=df_test, df_cal=df_cal, show_all_PI=show_all_PI) diff --git a/neuralprophet/time_net.py b/neuralprophet/time_net.py index c30594e29..e1c3ef8b8 100644 --- a/neuralprophet/time_net.py +++ b/neuralprophet/time_net.py @@ -158,9 +158,16 @@ def __init__( self.config_normalization = config_normalization self.compute_components_flag = compute_components_flag + # Continued training + self.continue_training = continue_training + self.start_epoch = start_epoch + # Optimizer and LR Scheduler - self._optimizer = self.config_train.optimizer - self._scheduler = self.config_train.scheduler + # self.config_train.set_optimizer() + # self.config_train.set_scheduler() + # self._optimizer = self.config_train.optimizer + # self._scheduler = self.config_train.scheduler + # Manual optimization: we are responsible for calling .backward(), .step(), .zero_grad(). self.automatic_optimization = False # Hyperparameters (can be tuned using trainer.tune()) @@ -314,10 +321,6 @@ def __init__( else: self.config_regressors.regressors = None - # Continued training - self.continue_training = continue_training - self.start_epoch = start_epoch - @property def ar_weights(self) -> torch.Tensor: """sets property auto-regression weights for regularization. Update if AR is modelled differently""" @@ -867,12 +870,14 @@ def predict_step(self, batch, batch_idx, dataloader_idx=0): return prediction, components def configure_optimizers(self): + self.config_train.set_optimizer() + self.config_train.set_scheduler() + self._optimizer = self.config_train.optimizer + self._scheduler = self.config_train.scheduler + # Optimizer optimizer = self._optimizer(self.parameters(), lr=self.learning_rate, **self.config_train.optimizer_args) - # Scheduler - self._scheduler = self.config_train.scheduler - if self.continue_training: optimizer.load_state_dict(self.config_train.optimizer_state) @@ -882,11 +887,7 @@ def configure_optimizers(self): for param_group in optimizer.param_groups: param_group["initial_lr"] = (last_lr,) - if self._scheduler == torch.optim.lr_scheduler.OneCycleLR: - log.warning("OneCycleLR scheduler is not supported for continued training. Switching to ExponentialLR") - self._scheduler = torch.optim.lr_scheduler.ExponentialLR - self.config_train.scheduler_args = {"gamma": 0.95} - + # Scheduler if self._scheduler == torch.optim.lr_scheduler.OneCycleLR: lr_scheduler = self._scheduler( optimizer, From 420f8a697ffc89338af575068398c6708b735e9a Mon Sep 17 00:00:00 2001 From: ourownstory Date: Sat, 24 Aug 2024 00:57:48 -0700 Subject: [PATCH 15/15] restructure train model config --- neuralprophet/configure.py | 48 ++++++------- neuralprophet/forecaster.py | 138 ++++++++++++++---------------------- neuralprophet/time_net.py | 4 +- tests/test_configure.py | 35 +++------ tests/test_train_config.py | 87 +++++++++++++++++++++++ tests/test_utils.py | 46 ------------ 6 files changed, 178 insertions(+), 180 deletions(-) create mode 100644 tests/test_train_config.py diff --git a/neuralprophet/configure.py b/neuralprophet/configure.py index 5d5144fc7..00eabd72b 100644 --- a/neuralprophet/configure.py +++ b/neuralprophet/configure.py @@ -23,6 +23,24 @@ @dataclass class Model: lagged_reg_layers: Optional[List[int]] + quantiles: Optional[List[float]] = None + + def setup_quantiles(self): + # convert quantiles to empty list [] if None + if self.quantiles is None: + self.quantiles = [] + # assert quantiles is a list type + assert isinstance(self.quantiles, list), "Quantiles must be provided as list." + # check if quantiles are float values in (0, 1) + assert all( + 0 < quantile < 1 for quantile in self.quantiles + ), "The quantiles specified need to be floats in-between (0, 1)." + # sort the quantiles + self.quantiles.sort() + # check if quantiles contain 0.5 or close to 0.5, remove if so as 0.5 will be inserted again as first index + self.quantiles = [quantile for quantile in self.quantiles if not math.isclose(0.5, quantile)] + # 0 is the median quantile index + self.quantiles.insert(0, 0.5) @dataclass @@ -92,7 +110,7 @@ class Train: batch_size: Optional[int] loss_func: Union[str, torch.nn.modules.loss._Loss, Callable] optimizer: Union[str, Type[torch.optim.Optimizer]] - quantiles: List[float] = field(default_factory=list) + # quantiles: List[float] = field(default_factory=list) optimizer_args: dict = field(default_factory=dict) scheduler: Optional[Union[str, Type[torch.optim.lr_scheduler.LRScheduler]]] = None scheduler_args: dict = field(default_factory=dict) @@ -106,20 +124,19 @@ class Train: lr_finder_args: dict = field(default_factory=dict) optimizer_state: dict = field(default_factory=dict) continue_training: bool = False + trainer_config: dict = field(default_factory=dict) def __post_init__(self): - # assert the uncertainty estimation params and then finalize the quantiles - # self.set_quantiles() assert self.newer_samples_weight >= 1.0 assert self.newer_samples_start >= 0.0 assert self.newer_samples_start < 1.0 - self.set_loss_func() + # self.set_loss_func(self.quantiles) # called in TimeNet configure_optimizers: # self.set_optimizer() # self.set_scheduler() - def set_loss_func(self): + def set_loss_func(self, quantiles: List[float]): if isinstance(self.loss_func, str): if self.loss_func.lower() in ["smoothl1", "smoothl1loss", "huber"]: # keeping 'huber' for backwards compatiblility, though not identical @@ -139,25 +156,8 @@ def set_loss_func(self): self.loss_func_name = type(self.loss_func).__name__ else: raise NotImplementedError(f"Loss function {self.loss_func} not found") - if len(self.quantiles) > 1: - self.loss_func = PinballLoss(loss_func=self.loss_func, quantiles=self.quantiles) - - # def set_quantiles(self): - # # convert quantiles to empty list [] if None - # if self.quantiles is None: - # self.quantiles = [] - # # assert quantiles is a list type - # assert isinstance(self.quantiles, list), "Quantiles must be in a list format, not None or scalar." - # # check if quantiles contain 0.5 or close to 0.5, remove if so as 0.5 will be inserted again as first index - # self.quantiles = [quantile for quantile in self.quantiles if not math.isclose(0.5, quantile)] - # # check if quantiles are float values in (0, 1) - # assert all( - # 0 < quantile < 1 for quantile in self.quantiles - # ), "The quantiles specified need to be floats in-between (0, 1)." - # # sort the quantiles - # self.quantiles.sort() - # # 0 is the median quantile index - # self.quantiles.insert(0, 0.5) + if len(quantiles) > 1: + self.loss_func = PinballLoss(loss_func=self.loss_func, quantiles=quantiles) def set_auto_batch_epoch( self, diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index ea95834f4..5c1b6d9cd 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -3,6 +3,7 @@ import os import time from collections import OrderedDict +from dataclasses import dataclass, field from typing import Callable, List, Optional, Tuple, Type, Union import matplotlib @@ -452,7 +453,7 @@ def __init__( scheduler_args: Optional[dict] = None, newer_samples_weight: float = 2, newer_samples_start: float = 0.0, - quantiles: List[float] = [], + quantiles: Optional[List[float]] = None, impute_missing: bool = True, impute_linear: int = 10, impute_rolling: int = 10, @@ -463,7 +464,7 @@ def __init__( global_time_normalization: bool = True, unknown_data_normalization: bool = False, accelerator: Optional[str] = None, - trainer_config: dict = {}, + trainer_config: Optional[dict] = None, prediction_frequency: Optional[dict] = None, ): self.config = locals() @@ -505,7 +506,11 @@ def __init__( self.max_lags = self.n_lags # Model - self.config_model = configure.Model(lagged_reg_layers=lagged_reg_layers) + self.config_model = configure.Model( + lagged_reg_layers=lagged_reg_layers, + quantiles=quantiles, + ) + self.config_model.setup_quantiles() # Trend self.config_trend = configure.Trend( @@ -519,24 +524,6 @@ def __init__( trend_local_reg=trend_local_reg, ) - # Model - self.quantiles = quantiles - # convert quantiles to empty list [] if None - if self.quantiles is None: - self.quantiles = [] - # assert quantiles is a list type - assert isinstance(self.quantiles, list), "Quantiles must be in a list format, not None or scalar." - # check if quantiles contain 0.5 or close to 0.5, remove if so as 0.5 will be inserted again as first index - self.quantiles = [quantile for quantile in self.quantiles if not math.isclose(0.5, quantile)] - # check if quantiles are float values in (0, 1) - assert all( - 0 < quantile < 1 for quantile in self.quantiles - ), "The quantiles specified need to be floats in-between (0, 1)." - # sort the quantiles - self.quantiles.sort() - # 0 is the median quantile index - self.quantiles.insert(0, 0.5) - # Training self.learning_rate = learning_rate self.scheduler = scheduler @@ -586,7 +573,7 @@ def __init__( # Pytorch Lightning Trainer self.metrics_logger = MetricsLogger(save_dir=os.getcwd()) self.accelerator = accelerator - self.trainer_config = trainer_config + self.trainer_config = trainer_config if trainer_config is not None else {} # set during prediction self.future_periods = None @@ -954,6 +941,7 @@ def fit( deterministic: bool = False, scheduler: Optional[Union[str, Type[torch.optim.lr_scheduler.LRScheduler]]] = None, scheduler_args: Optional[dict] = None, + trainer_config: Optional[dict] = None, ): """Train, and potentially evaluate model. @@ -1018,6 +1006,12 @@ def fit( pd.DataFrame metrics with training and potentially evaluation metrics """ + if minimal: + # overrides these settings: + checkpointing = False + self.metrics = False + progress = None + if self.fitted and not continue_training: raise RuntimeError( "Model has been fitted already. If you want to continue training please set the flag continue_training." @@ -1031,43 +1025,25 @@ def fit( log.error("Continued training requires checkpointing in model to continue from last epoch.") # Configuration - self.continue_training = continue_training - - # Config self.config_train = configure.Train( - quantiles=self.quantiles, - learning_rate=self.learning_rate, - scheduler=self.scheduler, - scheduler_args=self.scheduler_args, - epochs=self.epochs, - batch_size=self.batch_size, + learning_rate=self.learning_rate if learning_rate is None else learning_rate, + scheduler=self.scheduler if scheduler is None else scheduler, + scheduler_args=self.scheduler_args if scheduler is None else scheduler_args, + epochs=self.epochs if epochs is None else epochs, + batch_size=self.batch_size if batch_size is None else batch_size, loss_func=self.loss_func, optimizer=self.optimizer, newer_samples_weight=self.newer_samples_weight, newer_samples_start=self.newer_samples_start, trend_reg_threshold=self.config_trend.trend_reg_threshold, - continue_training=self.continue_training, + continue_training=continue_training, + trainer_config=self.trainer_config if trainer_config is None else trainer_config, ) - - if scheduler is not None: - self.config_train.scheduler = scheduler - self.config_train.scheduler_args = scheduler_args - - if epochs is not None: - self.config_train.epochs = epochs - - if batch_size is not None: - self.config_train.batch_size = batch_size - - if learning_rate is not None: - self.config_train.learning_rate = learning_rate + self.config_train.set_loss_func(quantiles=self.config_model.quantiles) if early_stopping is not None: self.early_stopping = early_stopping - if metrics is not None: - self.metrics = utils_metrics.get_metrics(metrics) - # Warnings if early_stopping: reg_enabled = utils.check_for_regularization( @@ -1088,19 +1064,16 @@ def fit( number of epochs to train for." ) - if progress == "plot" and metrics is False: - log.info("Progress plot requires metrics to be enabled. Enabling the default metrics.") - metrics = utils_metrics.get_metrics(True) + if metrics: + self.metrics = utils_metrics.get_metrics(metrics) + + if progress == "plot" and not metrics: + log.info("Progress plot requires metrics to be enabled. Disabling progress plot.") + progress = None if not self.config_normalization.global_normalization: log.info("When Global modeling with local normalization, metrics are displayed in normalized scale.") - if minimal: - # overrides these settings: - checkpointing = False - self.metrics = False - progress = None - # Pre-processing # Copy df and save list of unique time series IDs (the latter for global-local modelling if enabled) df, _, _, self.id_list = df_utils.prep_or_copy_df(df) @@ -1266,7 +1239,7 @@ def predict(self, df: pd.DataFrame, decompose: bool = True, raw: bool = False, a dates=dates, predicted=predicted, n_forecasts=self.n_forecasts, - quantiles=self.quantiles, + quantiles=self.config_model.quantiles, components=components, ) if auto_extend and periods_added[df_name] > 0: @@ -1281,7 +1254,7 @@ def predict(self, df: pd.DataFrame, decompose: bool = True, raw: bool = False, a n_forecasts=self.n_forecasts, max_lags=self.max_lags, freq=self.data_freq, - quantiles=self.quantiles, + quantiles=self.config_model.quantiles, config_lagged_regressors=self.config_lagged_regressors, ) if auto_extend and periods_added[df_name] > 0: @@ -1922,7 +1895,7 @@ def predict_trend(self, df: pd.DataFrame, quantile: float = 0.5): else: meta_name_tensor = None - quantile_index = self.quantiles.index(quantile) + quantile_index = self.config_model.quantiles.index(quantile) trend = self.model.trend(t, meta_name_tensor).detach().numpy()[:, :, quantile_index].squeeze() data_params = self.config_normalization.get_data_params(df_name) @@ -1987,7 +1960,7 @@ def predict_seasonal_components(self, df: pd.DataFrame, quantile: float = 0.5): for name in self.config_seasonality.periods: features = inputs["seasonalities"][name] - quantile_index = self.quantiles.index(quantile) + quantile_index = self.config_model.quantiles.index(quantile) y_season = torch.squeeze( self.model.seasonality.compute_fourier(features=features, name=name, meta=meta_name_tensor)[ :, :, quantile_index @@ -2119,7 +2092,7 @@ def plot( log.info(f"Plotting data from ID {df_name}") if forecast_in_focus is None: forecast_in_focus = self.highlight_forecast_step_n - if len(self.quantiles) > 1: + if len(self.config_model.quantiles) > 1: if (self.highlight_forecast_step_n) is None and ( self.n_forecasts > 1 or self.n_lags > 0 ): # rather query if n_forecasts >1 than n_lags>1 @@ -2159,7 +2132,7 @@ def plot( if plotting_backend.startswith("plotly"): return plot_plotly( fcst=fcst, - quantiles=self.quantiles, + quantiles=self.config_model.quantiles, xlabel=xlabel, ylabel=ylabel, figsize=tuple(x * 70 for x in figsize), @@ -2170,7 +2143,7 @@ def plot( else: return plot( fcst=fcst, - quantiles=self.quantiles, + quantiles=self.config_model.quantiles, ax=ax, xlabel=xlabel, ylabel=ylabel, @@ -2238,7 +2211,9 @@ def get_latest_forecast( fcst = fcst[-(include_previous_forecasts + self.n_forecasts) :] elif include_history_data is True: fcst = fcst - fcst = utils.fcst_df_to_latest_forecast(fcst, self.quantiles, n_last=1 + include_previous_forecasts) + fcst = utils.fcst_df_to_latest_forecast( + fcst, self.config_model.quantiles, n_last=1 + include_previous_forecasts + ) return fcst def plot_latest_forecast( @@ -2306,7 +2281,7 @@ def plot_latest_forecast( else: fcst = fcst[fcst["ID"] == df_name].copy(deep=True) log.info(f"Plotting data from ID {df_name}") - if len(self.quantiles) > 1: + if len(self.config_model.quantiles) > 1: log.warning( "Plotting latest forecasts when uncertainty estimation enabled" " plots only the median quantile forecasts." @@ -2317,7 +2292,9 @@ def plot_latest_forecast( fcst = fcst[-(include_previous_forecasts + self.n_forecasts) :] elif plot_history_data is True: fcst = fcst - fcst = utils.fcst_df_to_latest_forecast(fcst, self.quantiles, n_last=1 + include_previous_forecasts) + fcst = utils.fcst_df_to_latest_forecast( + fcst, self.config_model.quantiles, n_last=1 + include_previous_forecasts + ) # Check whether a local or global plotting backend is set. plotting_backend = select_plotting_backend(model=self, plotting_backend=plotting_backend) @@ -2326,7 +2303,7 @@ def plot_latest_forecast( if plotting_backend.startswith("plotly"): return plot_plotly( fcst=fcst, - quantiles=self.quantiles, + quantiles=self.config_model.quantiles, ylabel=ylabel, xlabel=xlabel, figsize=tuple(x * 70 for x in figsize), @@ -2338,7 +2315,7 @@ def plot_latest_forecast( else: return plot( fcst=fcst, - quantiles=self.quantiles, + quantiles=self.config_model.quantiles, ax=ax, ylabel=ylabel, xlabel=xlabel, @@ -2504,7 +2481,7 @@ def plot_components( m=self, fcst=fcst, plot_configuration=valid_plot_configuration, - quantile=self.quantiles[0], # plot components only for median quantile + quantile=self.config_model.quantiles[0], # plot components only for median quantile figsize=figsize, df_name=df_name, one_period_per_season=one_period_per_season, @@ -2614,11 +2591,11 @@ def plot_parameters( if not (0 < quantile < 1): raise ValueError("The quantile selected needs to be a float in-between (0,1)") # ValueError if selected quantile is out of range - if quantile not in self.quantiles: + if quantile not in self.config_model.quantiles: raise ValueError("Selected quantile is not specified in the model configuration.") else: # plot parameters for median quantile if not specified - quantile = self.quantiles[0] + quantile = self.config_model.quantiles[0] # Validate components to be plotted valid_parameters_set = [ @@ -2686,13 +2663,9 @@ def plot_parameters( ) def _init_model(self): - """Build Pytorch model with configured hyperparamters. - - Returns - ------- - TimeNet model - """ + """Build Pytorch model with configured hyperparamters.""" self.model = time_net.TimeNet( + config_model=self.config_model, config_train=self.config_train, config_trend=self.config_trend, config_ar=self.config_ar, @@ -2715,7 +2688,6 @@ def _init_model(self): meta_used_in_model=self.meta_used_in_model, ) log.debug(self.model) - return self.model def _init_train_loader(self, df, num_workers=0): """Executes data preparation steps and initiates training procedure. @@ -2855,14 +2827,14 @@ def _train( self.config_train.set_optimizer_state(checkpoint["optimizer_states"][0]) else: - self.model = self._init_model() + self._init_model() self.model.train_loader = train_loader # Init the Trainer self.trainer, checkpoint_callback = utils.configure_trainer( config_train=self.config_train, - config=self.trainer_config, + config=self.config_train.trainer_config, metrics_logger=self.metrics_logger, early_stopping=self.early_stopping, early_stopping_target="Loss_val" if validation_enabled else "Loss", @@ -2960,7 +2932,7 @@ def restore_trainer(self, accelerator: Optional[str] = None): """ self.trainer, _ = utils.configure_trainer( config_train=self.config_train, - config=self.trainer_config, + config=self.config_train.trainer_config, metrics_logger=self.metrics_logger, early_stopping=self.early_stopping, accelerator=accelerator, @@ -3165,7 +3137,7 @@ def conformal_predict( alpha=alpha, method=method, n_forecasts=self.n_forecasts, - quantiles=self.quantiles, + quantiles=self.config_model.quantiles, ) df_forecast = c.predict(df=df_test, df_cal=df_cal, show_all_PI=show_all_PI) diff --git a/neuralprophet/time_net.py b/neuralprophet/time_net.py index e1c3ef8b8..8f847d56d 100644 --- a/neuralprophet/time_net.py +++ b/neuralprophet/time_net.py @@ -42,6 +42,7 @@ class TimeNet(pl.LightningModule): def __init__( self, + config_model: configure.Model, config_seasonality: configure.ConfigSeasonality, config_train: Optional[configure.Train] = None, config_trend: Optional[configure.Trend] = None, @@ -151,6 +152,7 @@ def __init__( pass # General + self.config_model = config_model self.n_forecasts = n_forecasts # Lightning Config @@ -209,7 +211,7 @@ def __init__( ) # Quantiles - self.quantiles = self.config_train.quantiles + self.quantiles = self.config_model.quantiles # Trend self.config_trend = config_trend diff --git a/tests/test_configure.py b/tests/test_configure.py index e5c5e9800..a93539e29 100644 --- a/tests/test_configure.py +++ b/tests/test_configure.py @@ -1,20 +1,6 @@ import pytest -from neuralprophet.configure import Train - - -def generate_config_train_params(overrides={}): - config_train_params = { - "quantiles": None, - "learning_rate": None, - "epochs": None, - "batch_size": None, - "loss_func": "SmoothL1Loss", - "optimizer": "AdamW", - } - for key, value in overrides.items(): - config_train_params[key] = value - return config_train_params +from neuralprophet import NeuralProphet def test_config_training_quantiles(): @@ -26,24 +12,21 @@ def test_config_training_quantiles(): ({"quantiles": [0.2, 0.8]}, [0.5, 0.2, 0.8]), ({"quantiles": [0.5, 0.8]}, [0.5, 0.8]), ] - for overrides, expected in checks: - config_train_params = generate_config_train_params(overrides) - config = Train(**config_train_params) - assert config.quantiles == expected + model = NeuralProphet(**overrides) + assert model.config_model.quantiles == expected def test_config_training_quantiles_error_invalid_type(): - config_train_params = generate_config_train_params() - config_train_params["quantiles"] = "hello world" with pytest.raises(AssertionError) as err: - Train(**config_train_params) - assert str(err.value) == "Quantiles must be in a list format, not None or scalar." + _ = NeuralProphet(quantiles="hello world") + assert str(err.value) == "Quantiles must be provided as list." def test_config_training_quantiles_error_invalid_scale(): - config_train_params = generate_config_train_params() - config_train_params["quantiles"] = [-1] with pytest.raises(Exception) as err: - Train(**config_train_params) + _ = NeuralProphet(quantiles=[-1]) + assert str(err.value) == "The quantiles specified need to be floats in-between (0, 1)." + with pytest.raises(Exception) as err: + _ = NeuralProphet(quantiles=[1.3]) assert str(err.value) == "The quantiles specified need to be floats in-between (0, 1)." diff --git a/tests/test_train_config.py b/tests/test_train_config.py new file mode 100644 index 000000000..95716365e --- /dev/null +++ b/tests/test_train_config.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 + +import io +import logging +import os +import pathlib + +import pandas as pd +import pytest + +from neuralprophet import NeuralProphet, df_utils, load, save + +log = logging.getLogger("NP.test") +log.setLevel("ERROR") +log.parent.setLevel("ERROR") + +DIR = pathlib.Path(__file__).parent.parent.absolute() +DATA_DIR = os.path.join(DIR, "tests", "test-data") +PEYTON_FILE = os.path.join(DATA_DIR, "wp_log_peyton_manning.csv") +AIR_FILE = os.path.join(DATA_DIR, "air_passengers.csv") +YOS_FILE = os.path.join(DATA_DIR, "yosemite_temps.csv") +NROWS = 512 +EPOCHS = 10 +ADDITIONAL_EPOCHS = 5 +LR = 1.0 +BATCH_SIZE = 64 + +PLOT = False + + +def generate_config_train_params(overrides={}): + config_train_params = { + "learning_rate": None, + "epochs": None, + "batch_size": None, + "loss_func": "SmoothL1Loss", + "optimizer": "AdamW", + } + for key, value in overrides.items(): + config_train_params[key] = value + return config_train_params + + +def test_continue_training(): + df = pd.read_csv(PEYTON_FILE, nrows=NROWS) + m = NeuralProphet( + epochs=EPOCHS, + batch_size=BATCH_SIZE, + learning_rate=LR, + n_lags=6, + n_forecasts=3, + n_changepoints=0, + ) + metrics = m.fit(df, checkpointing=True, freq="D") + metrics2 = m.fit(df, freq="D", continue_training=True, epochs=ADDITIONAL_EPOCHS) + assert metrics["Loss"].min() >= metrics2["Loss"].min() + + +def test_continue_training_with_scheduler_selection(): + df = pd.read_csv(PEYTON_FILE, nrows=NROWS) + m = NeuralProphet( + epochs=EPOCHS, + batch_size=BATCH_SIZE, + learning_rate=LR, + n_lags=6, + n_forecasts=3, + n_changepoints=0, + ) + metrics = m.fit(df, checkpointing=True, freq="D") + # Continue training with StepLR + metrics2 = m.fit(df, freq="D", continue_training=True, epochs=ADDITIONAL_EPOCHS, scheduler="StepLR") + assert metrics["Loss"].min() >= metrics2["Loss"].min() + + +def test_save_load_continue_training(): + df = pd.read_csv(PEYTON_FILE, nrows=NROWS) + m = NeuralProphet( + epochs=EPOCHS, + n_lags=6, + n_forecasts=3, + n_changepoints=0, + ) + metrics = m.fit(df, checkpointing=True, freq="D") + save(m, "test_model.pt") + m2 = load("test_model.pt") + metrics2 = m2.fit(df, continue_training=True, epochs=ADDITIONAL_EPOCHS, scheduler="StepLR") + assert metrics["Loss"].min() >= metrics2["Loss"].min() diff --git a/tests/test_utils.py b/tests/test_utils.py index 3b93721bf..8bed33192 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -100,49 +100,3 @@ def test_save_load_io(): # Check that the forecasts are the same pd.testing.assert_frame_equal(forecast, forecast2) pd.testing.assert_frame_equal(forecast, forecast3) - - -def test_continue_training(): - df = pd.read_csv(PEYTON_FILE, nrows=NROWS) - m = NeuralProphet( - epochs=EPOCHS, - batch_size=BATCH_SIZE, - learning_rate=LR, - n_lags=6, - n_forecasts=3, - n_changepoints=0, - ) - metrics = m.fit(df, checkpointing=True, freq="D") - metrics2 = m.fit(df, freq="D", continue_training=True, epochs=ADDITIONAL_EPOCHS) - assert metrics["Loss"].min() >= metrics2["Loss"].min() - - -def test_continue_training_with_scheduler_selection(): - df = pd.read_csv(PEYTON_FILE, nrows=NROWS) - m = NeuralProphet( - epochs=EPOCHS, - batch_size=BATCH_SIZE, - learning_rate=LR, - n_lags=6, - n_forecasts=3, - n_changepoints=0, - ) - metrics = m.fit(df, checkpointing=True, freq="D") - # Continue training with StepLR - metrics2 = m.fit(df, freq="D", continue_training=True, epochs=ADDITIONAL_EPOCHS, scheduler="StepLR") - assert metrics["Loss"].min() >= metrics2["Loss"].min() - - -def test_save_load_continue_training(): - df = pd.read_csv(PEYTON_FILE, nrows=NROWS) - m = NeuralProphet( - epochs=EPOCHS, - n_lags=6, - n_forecasts=3, - n_changepoints=0, - ) - metrics = m.fit(df, checkpointing=True, freq="D") - save(m, "test_model.pt") - m2 = load("test_model.pt") - metrics2 = m2.fit(df, continue_training=True, epochs=ADDITIONAL_EPOCHS, scheduler="StepLR") - assert metrics["Loss"].min() >= metrics2["Loss"].min()