diff --git a/etna/ensembles/stacking_ensemble.py b/etna/ensembles/stacking_ensemble.py index d8c06d46d..ebbc9d8ab 100644 --- a/etna/ensembles/stacking_ensemble.py +++ b/etna/ensembles/stacking_ensemble.py @@ -102,15 +102,16 @@ def __init__( self.joblib_params = joblib_params super().__init__(horizon=self._get_horizon(pipelines=pipelines)) - def _make_same_level(self, ts: TSDataset, forecasts: List[TSDataset]) -> TSDataset: + def _make_same_level(self, ts: TSDataset, forecasts: List[pd.DataFrame]) -> TSDataset: if ts.has_hierarchy(): - if ts.current_df_level != forecasts[0].current_df_level: - ts = ts.get_level_dataset(forecasts[0].current_df_level) # type: ignore + current_df_level = ts._get_dataframe_level(df=forecasts[0]) + if ts.current_df_level != current_df_level: + ts = ts.get_level_dataset(current_df_level) # type: ignore return ts - def _filter_features_to_use(self, forecasts: List[TSDataset]) -> Union[None, Set[str]]: + def _filter_features_to_use(self, forecasts: List[pd.DataFrame]) -> Union[None, Set[str]]: """Return all the features from ``features_to_use`` which can be obtained from base models' forecasts.""" - features_df = pd.concat([forecast.df for forecast in forecasts], axis=1) + features_df = pd.concat(forecasts, axis=1) available_features = set(features_df.columns.get_level_values("feature")) - {"fold_number"} features_to_use = self.features_to_use if features_to_use is None: @@ -134,10 +135,9 @@ def _filter_features_to_use(self, forecasts: List[TSDataset]) -> Union[None, Set ) return None - def _backtest_pipeline(self, pipeline: BasePipeline, ts: TSDataset) -> TSDataset: + def _backtest_pipeline(self, pipeline: BasePipeline, ts: TSDataset) -> pd.DataFrame: """Get forecasts from backtest for given pipeline.""" forecasts = pipeline.get_historical_forecasts(ts=ts, n_folds=self.n_folds) - forecasts = TSDataset(df=forecasts, freq=ts.freq, hierarchical_structure=ts.hierarchical_structure) return forecasts def fit(self, ts: TSDataset, save_ts: bool = True) -> "StackingEnsemble": @@ -176,14 +176,16 @@ def fit(self, ts: TSDataset, save_ts: bool = True) -> "StackingEnsemble": return self def _make_features( - self, ts: TSDataset, forecasts: List[TSDataset], train: bool = False + self, ts: TSDataset, forecasts: List[pd.DataFrame], train: bool = False ) -> Tuple[pd.DataFrame, Optional[pd.Series]]: """Prepare features for the ``final_model``.""" ts = self._make_same_level(ts=ts, forecasts=forecasts) # Stack targets from the forecasts targets = [ - forecast[:, :, "target"].rename({"target": f"regressor_target_{i}"}, level="feature", axis=1) + forecast.loc[:, pd.IndexSlice[:, "target"]].rename( + {"target": f"regressor_target_{i}"}, level="feature", axis=1 + ) for i, forecast in enumerate(forecasts) ] targets = pd.concat(targets, axis=1) @@ -200,7 +202,8 @@ def _make_features( for forecast in forecasts ] features = pd.concat( - [forecast[:, :, features_in_forecasts[i]] for i, forecast in enumerate(forecasts)], axis=1 + [forecast.loc[:, pd.IndexSlice[:, features_in_forecasts[i]]] for i, forecast in enumerate(forecasts)], + axis=1, ) features = features.loc[:, ~features.columns.duplicated()] features_df = pd.concat([features, targets], axis=1) @@ -216,7 +219,7 @@ def _make_features( else: return x, None - def _process_forecasts(self, ts: TSDataset, forecasts: List[TSDataset]) -> TSDataset: + def _process_forecasts(self, ts: TSDataset, forecasts: List[pd.DataFrame]) -> TSDataset: ts = self._make_same_level(ts=ts, forecasts=forecasts) x, _ = self._make_features(ts=ts, forecasts=forecasts, train=False) diff --git a/etna/ensembles/voting_ensemble.py b/etna/ensembles/voting_ensemble.py index bcce36f3a..43ad590f7 100644 --- a/etna/ensembles/voting_ensemble.py +++ b/etna/ensembles/voting_ensemble.py @@ -121,10 +121,9 @@ def _validate_weights(weights: Optional[Union[List[float], Literal["auto"]]], pi else: raise ValueError("Invalid format of weights is passed!") - def _backtest_pipeline(self, pipeline: BasePipeline, ts: TSDataset) -> TSDataset: + def _backtest_pipeline(self, pipeline: BasePipeline, ts: TSDataset) -> pd.DataFrame: """Get forecasts from backtest for given pipeline.""" forecasts = pipeline.get_historical_forecasts(ts=ts, n_folds=self.n_folds) - forecasts = TSDataset(df=forecasts, freq=ts.freq) return forecasts def _process_weights(self, ts: TSDataset) -> List[float]: @@ -138,7 +137,7 @@ def _process_weights(self, ts: TSDataset) -> List[float]: x = pd.concat( [ - forecast[:, :, "target"].rename({"target": f"target_{i}"}, axis=1) + forecast.loc[:, pd.IndexSlice[:, "target"]].rename({"target": f"target_{i}"}, axis=1) for i, forecast in enumerate(forecasts) ], axis=1,