diff --git a/CHANGELOG.md b/CHANGELOG.md index 1dc3b32aa..468025f16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - - Handle new functionality for prediction intervals in the `plot_forecast` ([#130](https://github.com/etna-team/etna/pull/130)) - Add `get_historical_forecasts` to pipelines for forecast estimation at each fold on the historical dataset ([#143](https://github.com/etna-team/etna/pull/143)) +- `ConformalPredictionIntervals` method for prediction intervals estimation ([#152](https://github.com/etna-team/etna/pull/152)) - Add DeepARNativeModel ([#114](https://github.com/etna-team/etna/pull/114)) - - diff --git a/docs/source/api_reference/experimental.rst b/docs/source/api_reference/experimental.rst index c5c819bd7..f3ab365d1 100644 --- a/docs/source/api_reference/experimental.rst +++ b/docs/source/api_reference/experimental.rst @@ -35,3 +35,12 @@ Prediction Intervals: prediction_intervals.BasePredictionIntervals prediction_intervals.NaiveVariancePredictionIntervals + prediction_intervals.ConformalPredictionIntervals + +Prediction Intervals utilities: + +.. autosummary:: + :toctree: api/ + :template: base.rst + + prediction_intervals.utils.residuals_matrices diff --git a/etna/experimental/prediction_intervals/__init__.py b/etna/experimental/prediction_intervals/__init__.py index abe51ac4a..98f17c080 100644 --- a/etna/experimental/prediction_intervals/__init__.py +++ b/etna/experimental/prediction_intervals/__init__.py @@ -1,2 +1,3 @@ from etna.experimental.prediction_intervals.base import BasePredictionIntervals +from etna.experimental.prediction_intervals.conformal import ConformalPredictionIntervals from etna.experimental.prediction_intervals.naive_variance import NaiveVariancePredictionIntervals diff --git a/etna/experimental/prediction_intervals/conformal.py b/etna/experimental/prediction_intervals/conformal.py new file mode 100644 index 000000000..c60e185bb --- /dev/null +++ b/etna/experimental/prediction_intervals/conformal.py @@ -0,0 +1,91 @@ +from typing import Sequence + +import numpy as np +import pandas as pd + +from etna.datasets import TSDataset +from etna.experimental.prediction_intervals import BasePredictionIntervals +from etna.experimental.prediction_intervals.utils import residuals_matrices +from etna.pipeline import BasePipeline + + +class ConformalPredictionIntervals(BasePredictionIntervals): + """Estimate conformal prediction intervals using absolute values of historical residuals. + + 1. Compute matrix of absolute residuals :math:`r_{it} = |\hat y_{it} - y_{it}|` using k-fold backtest, where :math:`i` is fold index. + + 2. Estimate corresponding quantiles levels using the provided coverage (e.g. apply Bonferroni correction). + + 3. Estimate quantiles for each timestamp using computed absolute residuals and levels. + + `Relevant paper `_. + `Reference implementation `_. + """ + + def __init__( + self, pipeline: BasePipeline, coverage: float = 0.95, bonferroni_correction: bool = False, stride: int = 1 + ): + """Initialize instance of ``ConformalPredictionIntervals`` with given parameters. + + Parameters + ---------- + pipeline: + Base pipeline or ensemble for prediction intervals estimation. + coverage: + Interval coverage. In literature this value maybe referred as ``1 - alpha``. + bonferroni_correction: + Whether to use Bonferroni correction when estimating quantiles. + stride: + Number of points between folds. + """ + if not (0 <= coverage <= 1): + raise ValueError("Parameter `coverage` must be non-negative number not greater than 1!") + + if stride <= 0: + raise ValueError("Parameter `stride` must be positive!") + + self.coverage = coverage + self.bonferroni_correction = bonferroni_correction + self.stride = stride + + super().__init__(pipeline=pipeline) + + def _forecast_prediction_interval( + self, ts: TSDataset, predictions: TSDataset, quantiles: Sequence[float], n_folds: int + ) -> TSDataset: + """Estimate and store prediction intervals. + + Parameters + ---------- + ts: + Dataset to forecast. + predictions: + Dataset with point predictions. + quantiles: + Levels of prediction distribution. + n_folds: + Number of folds to use in the backtest for prediction interval estimation. + + Returns + ------- + : + Dataset with predictions. + """ + residuals = residuals_matrices(pipeline=self, ts=ts, n_folds=n_folds, stride=self.stride) + abs_residuals = np.abs(residuals) + + level = self.coverage + if self.bonferroni_correction: + level = 1 - (1 - self.coverage) / self.horizon + + critical_scores = np.quantile(abs_residuals, q=level, axis=0) + + upper_border = predictions[:, :, "target"] + critical_scores + upper_border.rename({"target": "target_upper"}, inplace=True, axis=1) + + lower_border = predictions[:, :, "target"] - critical_scores + lower_border.rename({"target": "target_lower"}, inplace=True, axis=1) + + intervals_df = pd.concat([lower_border, upper_border], axis=1) + predictions.add_prediction_intervals(prediction_intervals_df=intervals_df) + return predictions diff --git a/etna/experimental/prediction_intervals/naive_variance.py b/etna/experimental/prediction_intervals/naive_variance.py index 62d72d069..f23a6c065 100644 --- a/etna/experimental/prediction_intervals/naive_variance.py +++ b/etna/experimental/prediction_intervals/naive_variance.py @@ -6,6 +6,7 @@ from etna.datasets import TSDataset from etna.experimental.prediction_intervals import BasePredictionIntervals +from etna.experimental.prediction_intervals.utils import residuals_matrices from etna.pipeline import BasePipeline @@ -61,7 +62,7 @@ def _forecast_prediction_interval( : Dataset with predictions. """ - residuals = self._compute_resids_matrices(ts=ts, n_folds=n_folds) + residuals = residuals_matrices(pipeline=self, ts=ts, n_folds=n_folds, stride=self.stride) variance = self._estimate_variance(residual_matrices=residuals) @@ -76,29 +77,6 @@ def _forecast_prediction_interval( predictions.add_prediction_intervals(prediction_intervals_df=quantiles_df) return predictions - def _compute_resids_matrices(self, ts: TSDataset, n_folds: int) -> np.ndarray: - """Estimate residuals matrices with backtest. - - Parameters - ---------- - ts: - Dataset to estimate residuals. - n_folds: - Number of folds for backtest. - - Returns - ------- - : - Residuals matrices for each segment. Array with shape: ``(n_folds, horizon, n_segments)``. - """ - backtest_forecasts = self.get_historical_forecasts(ts=ts, n_folds=n_folds, stride=self.stride) - - residuals = backtest_forecasts.loc[:, pd.IndexSlice[:, "target"]] - ts[backtest_forecasts.index, :, "target"] - - # shape: (n_folds, horizon, n_segments) - residual_matrices = residuals.values.reshape((-1, self.horizon, len(ts.segments))) - return residual_matrices - def _estimate_variance(self, residual_matrices: np.ndarray) -> np.ndarray: """Estimate variance from residuals matrices. diff --git a/etna/experimental/prediction_intervals/utils.py b/etna/experimental/prediction_intervals/utils.py new file mode 100644 index 000000000..d8038e3bd --- /dev/null +++ b/etna/experimental/prediction_intervals/utils.py @@ -0,0 +1,43 @@ +from typing import Optional + +import numpy as np +import pandas as pd + +from etna.datasets import TSDataset +from etna.pipeline import BasePipeline + + +def residuals_matrices( + pipeline: BasePipeline, ts: TSDataset, n_folds: int = 5, stride: Optional[int] = None +) -> np.ndarray: + """Estimate residuals matrices with backtest. + + Parameters + ---------- + pipeline: + Pipeline for residuals estimation. + ts: + Dataset to estimate residuals. + n_folds: + Number of folds for backtest. + stride: + Number of points between folds. By default, is set to ``horizon``. + + Returns + ------- + : + Residuals matrices for each segment. Array with shape: ``(n_folds, horizon, n_segments)``. + """ + if n_folds <= 0: + raise ValueError("Parameter `n_folds` must be positive!") + + if stride is not None and stride <= 0: + raise ValueError("Parameter `stride` must be positive!") + + backtest_forecasts = pipeline.get_historical_forecasts(ts=ts, n_folds=n_folds, stride=stride) + + residuals = backtest_forecasts.loc[:, pd.IndexSlice[:, "target"]] - ts[backtest_forecasts.index, :, "target"] + + # shape: (n_folds, horizon, n_segments) + residual_matrices = residuals.values.reshape((-1, pipeline.horizon, len(ts.segments))) + return residual_matrices diff --git a/tests/test_experimental/test_prediction_intervals/test_conformal.py b/tests/test_experimental/test_prediction_intervals/test_conformal.py new file mode 100644 index 000000000..6724edff1 --- /dev/null +++ b/tests/test_experimental/test_prediction_intervals/test_conformal.py @@ -0,0 +1,146 @@ +from unittest.mock import MagicMock + +import numpy as np +import pandas as pd +import pytest + +from etna.ensembles import DirectEnsemble +from etna.ensembles import StackingEnsemble +from etna.ensembles import VotingEnsemble +from etna.experimental.prediction_intervals import ConformalPredictionIntervals +from etna.models import NaiveModel +from etna.pipeline import AutoRegressivePipeline +from etna.pipeline import HierarchicalPipeline +from etna.pipeline import Pipeline +from etna.reconciliation import BottomUpReconciliator +from tests.test_experimental.test_prediction_intervals.common import get_arima_pipeline +from tests.test_experimental.test_prediction_intervals.common import get_catboost_pipeline +from tests.test_experimental.test_prediction_intervals.common import get_naive_pipeline +from tests.test_experimental.test_prediction_intervals.common import get_naive_pipeline_with_transforms +from tests.test_experimental.test_prediction_intervals.common import run_base_pipeline_compat_check + + +@pytest.mark.parametrize("stride", (-1, 0)) +def test_invalid_stride_parameter_error(stride): + with pytest.raises(ValueError, match="Parameter `stride` must be positive!"): + ConformalPredictionIntervals(pipeline=Pipeline(model=NaiveModel()), stride=stride) + + +@pytest.mark.parametrize("coverage", (-3, -1)) +def test_invalid_coverage_parameter_error(coverage): + with pytest.raises(ValueError, match="Parameter `coverage` must be non-negative"): + ConformalPredictionIntervals(pipeline=Pipeline(model=NaiveModel()), coverage=coverage) + + +@pytest.mark.parametrize("pipeline_name", ("naive_pipeline", "naive_pipeline_with_transforms")) +def test_pipeline_fit_forecast_without_intervals(example_tsds, pipeline_name, request): + pipeline = request.getfixturevalue(pipeline_name) + + intervals_pipeline = ConformalPredictionIntervals(pipeline=pipeline) + + intervals_pipeline.fit(ts=example_tsds) + + intervals_pipeline_pred = intervals_pipeline.forecast(prediction_interval=False) + pipeline_pred = pipeline.forecast(prediction_interval=False) + + pd.testing.assert_frame_equal(intervals_pipeline_pred.df, pipeline_pred.df) + + +@pytest.mark.parametrize("stride", (2, 5, 10)) +@pytest.mark.parametrize("expected_columns", ({"target", "target_lower", "target_upper"},)) +def test_valid_strides(example_tsds, expected_columns, stride): + intervals_pipeline = ConformalPredictionIntervals(pipeline=Pipeline(model=NaiveModel(), horizon=5), stride=stride) + run_base_pipeline_compat_check( + ts=example_tsds, intervals_pipeline=intervals_pipeline, expected_columns=expected_columns + ) + + +@pytest.mark.parametrize( + "expected_columns", + ({"target", "target_lower", "target_upper"},), +) +@pytest.mark.parametrize( + "pipeline", + ( + get_naive_pipeline(horizon=1), + get_naive_pipeline_with_transforms(horizon=1), + AutoRegressivePipeline(model=NaiveModel(), horizon=1), + HierarchicalPipeline( + model=NaiveModel(), + horizon=1, + reconciliator=BottomUpReconciliator(target_level="market", source_level="product"), + ), + ), +) +def test_pipelines_forecast_intervals_exist(product_level_constant_hierarchical_ts, pipeline, expected_columns): + intervals_pipeline = ConformalPredictionIntervals(pipeline=pipeline) + run_base_pipeline_compat_check( + ts=product_level_constant_hierarchical_ts, + intervals_pipeline=intervals_pipeline, + expected_columns=expected_columns, + ) + + +@pytest.mark.parametrize("pipeline", (get_arima_pipeline(horizon=5),)) +def test_forecast_prediction_intervals_is_used(example_tsds, pipeline): + intervals_pipeline = ConformalPredictionIntervals(pipeline=pipeline) + intervals_pipeline._forecast_prediction_interval = MagicMock() + + intervals_pipeline.fit(ts=example_tsds) + intervals_pipeline.forecast(prediction_interval=True) + intervals_pipeline._forecast_prediction_interval.assert_called() + + +@pytest.mark.parametrize( + "pipeline", + ( + get_naive_pipeline(horizon=5), + get_naive_pipeline_with_transforms(horizon=5), + AutoRegressivePipeline(model=NaiveModel(), horizon=5), + get_catboost_pipeline(horizon=5), + get_arima_pipeline(horizon=5), + ), +) +def test_pipelines_forecast_intervals_valid(example_tsds, pipeline): + intervals_pipeline = ConformalPredictionIntervals(pipeline=pipeline) + intervals_pipeline.fit(ts=example_tsds) + + prediction = intervals_pipeline.forecast(prediction_interval=True) + assert np.all(prediction[:, :, "target_lower"].values <= prediction[:, :, "target"].values) + assert np.all(prediction[:, :, "target"].values <= prediction[:, :, "target_upper"].values) + + +@pytest.mark.parametrize( + "expected_columns", + ({"target", "target_lower", "target_upper"},), +) +@pytest.mark.parametrize( + "ensemble", + ( + DirectEnsemble(pipelines=[get_naive_pipeline(horizon=1), get_naive_pipeline_with_transforms(horizon=2)]), + VotingEnsemble(pipelines=[get_naive_pipeline(horizon=1), get_naive_pipeline_with_transforms(horizon=1)]), + StackingEnsemble(pipelines=[get_naive_pipeline(horizon=1), get_naive_pipeline_with_transforms(horizon=1)]), + ), +) +def test_ensembles_forecast_intervals_exist(example_tsds, ensemble, expected_columns): + intervals_pipeline = ConformalPredictionIntervals(pipeline=ensemble) + run_base_pipeline_compat_check( + ts=example_tsds, intervals_pipeline=intervals_pipeline, expected_columns=expected_columns + ) + + +@pytest.mark.parametrize( + "ensemble", + ( + DirectEnsemble(pipelines=[get_naive_pipeline(horizon=5), get_naive_pipeline_with_transforms(horizon=6)]), + VotingEnsemble(pipelines=[get_naive_pipeline(horizon=5), get_naive_pipeline_with_transforms(horizon=5)]), + StackingEnsemble(pipelines=[get_naive_pipeline(horizon=5), get_naive_pipeline_with_transforms(horizon=5)]), + ), +) +def test_ensembles_forecast_intervals_valid(example_tsds, ensemble): + intervals_pipeline = ConformalPredictionIntervals(pipeline=ensemble) + intervals_pipeline.fit(ts=example_tsds) + + prediction = intervals_pipeline.forecast(prediction_interval=True) + assert np.all(prediction[:, :, "target_lower"].values <= prediction[:, :, "target"].values) + assert np.all(prediction[:, :, "target"].values <= prediction[:, :, "target_upper"].values) diff --git a/tests/test_experimental/test_prediction_intervals/test_naive_variance.py b/tests/test_experimental/test_prediction_intervals/test_naive_variance.py index 707661b6e..8cf4c5495 100644 --- a/tests/test_experimental/test_prediction_intervals/test_naive_variance.py +++ b/tests/test_experimental/test_prediction_intervals/test_naive_variance.py @@ -32,14 +32,6 @@ def test_estimate_variance_shape(dummy_array): assert out_array.shape == (5, 3) -@pytest.mark.parametrize("horizon,n_folds", ((4, 3), (2, 6), (5, 5))) -def test_compute_resids_matrix_shape(example_tsds, horizon, n_folds): - n_segments = len(example_tsds.segments) - intervals_pipeline = NaiveVariancePredictionIntervals(pipeline=Pipeline(model=NaiveModel(), horizon=horizon)) - residuals_matrices = intervals_pipeline._compute_resids_matrices(ts=example_tsds, n_folds=n_folds) - assert residuals_matrices.shape == (n_folds, horizon, n_segments) - - @pytest.mark.parametrize("pipeline_name", ("naive_pipeline", "naive_pipeline_with_transforms")) def test_pipeline_fit_forecast_without_intervals(example_tsds, pipeline_name, request): pipeline = request.getfixturevalue(pipeline_name) diff --git a/tests/test_experimental/test_prediction_intervals/test_utils.py b/tests/test_experimental/test_prediction_intervals/test_utils.py new file mode 100644 index 000000000..31eba6b21 --- /dev/null +++ b/tests/test_experimental/test_prediction_intervals/test_utils.py @@ -0,0 +1,49 @@ +import numpy as np +import pandas as pd +import pytest + +from etna.datasets import TSDataset +from etna.experimental.prediction_intervals.utils import residuals_matrices +from tests.test_experimental.test_prediction_intervals.common import get_naive_pipeline + + +@pytest.fixture +def constant_ts(size=30) -> TSDataset: + segment_1 = [7] * size + segment_2 = [50] * size + ts_range = list(pd.date_range("2023-01-01", freq="D", periods=size)) + df = pd.DataFrame( + { + "timestamp": ts_range * 2, + "target": segment_1 + segment_2, + "segment": ["segment_1"] * size + ["segment_2"] * size, + } + ) + ts = TSDataset(TSDataset.to_dataset(df=df), freq="D") + return ts + + +@pytest.mark.parametrize("n_folds", (-2, 0)) +def test_residuals_matrices_invalid_n_folds(naive_pipeline, example_tsds, n_folds): + with pytest.raises(ValueError, match="Parameter `n_folds` must be positive!"): + _ = residuals_matrices(pipeline=naive_pipeline, ts=example_tsds, n_folds=n_folds) + + +@pytest.mark.parametrize("stride", (-1, 0)) +def test_residuals_matrices_invalid_stride(naive_pipeline, example_tsds, stride): + with pytest.raises(ValueError, match="Parameter `stride` must be positive!"): + _ = residuals_matrices(pipeline=naive_pipeline, ts=example_tsds, stride=stride) + + +@pytest.mark.parametrize("horizon,n_folds", ((4, 3), (2, 6), (5, 5))) +def test_residuals_matrices_output_shape(example_tsds, horizon, n_folds): + pipeline = get_naive_pipeline(horizon=horizon) + n_segments = len(example_tsds.segments) + res = residuals_matrices(pipeline=pipeline, ts=example_tsds, n_folds=n_folds) + assert res.shape == (n_folds, horizon, n_segments) + + +@pytest.mark.parametrize("stride,n_folds", ((1, 3), (1, 2), (5, 5), (5, 1), (7, 2))) +def test_residuals_matrices_constant_series(naive_pipeline, constant_ts, stride, n_folds): + res = residuals_matrices(pipeline=naive_pipeline, ts=constant_ts, n_folds=n_folds, stride=stride) + np.testing.assert_allclose(res, 0)