From 9505232bddeddfac9994ecbf488d48a7556645ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Morales?= Date: Thu, 23 Nov 2023 15:58:47 -0600 Subject: [PATCH 1/2] remove pyarrow dependency for polars --- dev/environment.yml | 2 +- dev/local_environment.yml | 2 +- nbs/src/core/core.ipynb | 240 +++++++++++--------------------------- settings.ini | 4 +- statsforecast/_modidx.py | 1 - statsforecast/core.py | 180 +++++++++++----------------- 6 files changed, 139 insertions(+), 290 deletions(-) diff --git a/dev/environment.yml b/dev/environment.yml index 2f0c3bc33..a7837c27d 100644 --- a/dev/environment.yml +++ b/dev/environment.yml @@ -23,4 +23,4 @@ dependencies: - polars - supersmoother - tqdm - - utilsforecast>=0.0.14 + - utilsforecast>=0.0.17 diff --git a/dev/local_environment.yml b/dev/local_environment.yml index 4f39241a1..0649e3a08 100644 --- a/dev/local_environment.yml +++ b/dev/local_environment.yml @@ -21,4 +21,4 @@ dependencies: - polars - supersmoother - tqdm - - utilsforecast>=0.0.14 + - utilsforecast>=0.0.17 diff --git a/nbs/src/core/core.ipynb b/nbs/src/core/core.ipynb index 29532fac8..3514ad15e 100644 --- a/nbs/src/core/core.ipynb +++ b/nbs/src/core/core.ipynb @@ -95,12 +95,12 @@ "\n", "import numpy as np\n", "import pandas as pd\n", + "import utilsforecast.processing as ufp\n", "from fugue.execution.factory import make_execution_engine, try_get_context_execution_engine\n", "from tqdm.autonotebook import tqdm\n", "from triad import conditional_dispatcher\n", - "from utilsforecast.compat import DataFrame, pl_DataFrame, pl\n", + "from utilsforecast.compat import DataFrame, pl, pl_DataFrame, pl_Series\n", "from utilsforecast.grouped_array import GroupedArray as BaseGroupedArray\n", - "from utilsforecast.processing import process_df\n", "from utilsforecast.validation import ensure_time_dtype\n", "\n", "import statsforecast.config as sf_config\n", @@ -784,95 +784,6 @@ ")" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "2248ad77-7658-416a-8eb5-fac9b17d8f1c", - "metadata": {}, - "outputs": [], - "source": [ - "#| exporti\n", - "def _cv_dates(last_dates, freq, h, test_size, step_size=1):\n", - " #assuming step_size = 1\n", - " if (test_size - h) % step_size:\n", - " raise Exception('`test_size - h` should be module `step_size`')\n", - " n_windows = int((test_size - h) / step_size) + 1\n", - " if len(np.unique(last_dates)) == 1:\n", - " if issubclass(last_dates.dtype.type, np.integer):\n", - " total_dates = np.arange(last_dates[0] - test_size + 1, last_dates[0] + 1)\n", - " out = np.empty((h * n_windows, 2), dtype=last_dates.dtype)\n", - " freq = 1\n", - " else:\n", - " total_dates = pd.date_range(end=last_dates[0], periods=test_size, freq=freq)\n", - " out = np.empty((h * n_windows, 2), dtype='datetime64[s]')\n", - " for i_window, cutoff in enumerate(range(-test_size, -h + 1, step_size), start=0):\n", - " end_cutoff = cutoff + h\n", - " out[h * i_window : h * (i_window + 1), 0] = total_dates[cutoff:] if end_cutoff == 0 else total_dates[cutoff:end_cutoff]\n", - " out[h * i_window : h * (i_window + 1), 1] = np.tile(total_dates[cutoff] - freq, h)\n", - " dates = pd.DataFrame(np.tile(out, (len(last_dates), 1)), columns=['ds', 'cutoff'])\n", - " else:\n", - " dates = pd.concat([_cv_dates(np.array([ld]), freq, h, test_size, step_size) for ld in last_dates])\n", - " dates = dates.reset_index(drop=True)\n", - " return dates" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "39444de3-107c-4024-8dfa-d4c5278db639", - "metadata": {}, - "outputs": [], - "source": [ - "#| hide\n", - "ds_int_cv_test = pd.DataFrame({\n", - " 'ds': np.hstack([\n", - " [46, 47, 48],\n", - " [47, 48, 49],\n", - " [48, 49, 50]\n", - " ]),\n", - " 'cutoff': [45] * 3 + [46] * 3 + [47] * 3\n", - "}, dtype=np.int64)\n", - "test_eq(ds_int_cv_test, _cv_dates(np.array([50], dtype=np.int64), 'D', 3, 5))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "863c212e-6331-4915-9f3a-2d20174b8977", - "metadata": {}, - "outputs": [], - "source": [ - "#| hide\n", - "ds_int_cv_test = pd.DataFrame({\n", - " 'ds': np.hstack([\n", - " [46, 47, 48],\n", - " [48, 49, 50]\n", - " ]),\n", - " 'cutoff': [45] * 3 + [47] * 3\n", - "}, dtype=np.int64)\n", - "test_eq(ds_int_cv_test, _cv_dates(np.array([50], dtype=np.int64), 'D', 3, 5, step_size=2))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "b988d4f2-e99d-4608-aba9-530d15e95d94", - "metadata": {}, - "outputs": [], - "source": [ - "#| hide\n", - "for e_e in [True, False]:\n", - " n_series = 2\n", - " series = generate_series(n_series, equal_ends=e_e)\n", - " dates = series.groupby('unique_id')['ds'].max()\n", - "\n", - " freq = pd.tseries.frequencies.to_offset('D')\n", - " horizon = 3\n", - " test_size = 5\n", - " df_dates = _cv_dates(last_dates=dates, freq=freq, h=horizon, test_size=test_size)\n", - " test_eq(len(df_dates), n_series * horizon * (test_size - horizon + 1)) " - ] - }, { "cell_type": "code", "execution_count": null, @@ -1007,7 +918,7 @@ " # TODO @fede: needed for residuals, think about it later\n", " self.models = models\n", " self._validate_model_names()\n", - " self.freq = pd.tseries.frequencies.to_offset(freq)\n", + " self.freq = freq\n", " self.n_jobs = n_jobs\n", " self.fallback_model = fallback_model\n", " self.verbose = verbose \n", @@ -1029,7 +940,6 @@ " self,\n", " df: Optional[DataFrame],\n", " sort_df: bool = True,\n", - " save_original: bool = False\n", " ) -> None:\n", " if df is None:\n", " _warn_df_constructor()\n", @@ -1044,24 +954,20 @@ " )\n", " df = df.reset_index()\n", " _maybe_warn_sort_df(sort_df)\n", - " self.uids, self.last_dates, data, indptr, sort_idxs = process_df(\n", + " self.uids, last_times, data, indptr, sort_idxs = ufp.process_df(\n", " df, 'unique_id', 'ds', 'y'\n", " )\n", " if isinstance(df, pd.DataFrame):\n", - " self.last_dates = pd.Index(self.last_dates, name='ds')\n", + " self.last_dates = pd.Index(last_times, name='ds')\n", + " else:\n", + " self.last_dates = pl_Series(last_times)\n", " self.ga = GroupedArray(data, indptr)\n", - " if save_original:\n", - " self.og_unique_id = df['unique_id']\n", - " self.og_dates = df['ds'].to_numpy()\n", - " if sort_idxs is not None:\n", - " if isinstance(self.og_unique_id, pd.Series):\n", - " self.og_unique_id = self.og_unique_id.iloc[sort_idxs]\n", - " else:\n", - " self.og_unique_id = self.og_unique_id.take(sort_idxs)\n", - " self.og_dates = self.og_dates[sort_idxs]\n", + " self.og_dates = df['ds'].to_numpy()\n", + " if sort_idxs is not None:\n", + " self.og_dates = self.og_dates[sort_idxs]\n", " self.n_jobs = _get_n_jobs(len(self.ga), self.n_jobs)\n", " self.df_constructor = type(df)\n", - " \n", + "\n", " def _set_prediction_intervals(self, prediction_intervals):\n", " for model in self.models:\n", " interval = getattr(model, \"prediction_intervals\", None)\n", @@ -1104,21 +1010,9 @@ " return self\n", " \n", " def _make_future_df(self, h: int):\n", - " if issubclass(self.last_dates.dtype.type, np.integer):\n", - " last_date_f = lambda x: np.arange(x + 1, x + 1 + h, dtype=self.last_dates.dtype)\n", - " else:\n", - " last_date_f = lambda x: pd.date_range(x + self.freq, periods=h, freq=self.freq)\n", - " if len(np.unique(self.last_dates)) == 1:\n", - " dates = np.tile(last_date_f(self.last_dates[0]), len(self.ga))\n", - " else:\n", - " dates = np.hstack([\n", - " last_date_f(last_date)\n", - " for last_date in self.last_dates \n", - " ])\n", - "\n", - " uids = np.repeat(self.uids, h)\n", - " if self.df_constructor is pl_DataFrame and uids.dtype.kind == 'O':\n", - " uids = uids.astype(str)\n", + " start_dates = ufp.offset_times(self.last_dates, freq=self.freq, n=1)\n", + " dates = ufp.time_ranges(start_dates, freq=self.freq, periods=h)\n", + " uids = ufp.repeat(self.uids, n=h)\n", " df = self.df_constructor({'unique_id': uids, 'ds': dates})\n", " if isinstance(df, pd.DataFrame):\n", " if sf_config.id_as_index:\n", @@ -1126,8 +1020,6 @@ " df = df.set_index('unique_id')\n", " else:\n", " df = df.reset_index(drop=True)\n", - " else:\n", - " df = df.with_columns(pl.col('unique_id').cast(self.uids.dtype))\n", " return df\n", "\n", " def _parse_X_level(self, h: int, X: Optional[DataFrame], level: Optional[List[int]]):\n", @@ -1139,7 +1031,7 @@ " if X.shape != expected_shape:\n", " raise ValueError(f'Expected X to have shape {expected_shape}, but got {X.shape}')\n", " first_col = [c for c in X.columns if c not in ('unique_id', 'ds')][0]\n", - " _, _, data, indptr, _ = process_df(X, 'unique_id', 'ds', first_col)\n", + " _, _, data, indptr, _ = ufp.process_df(X, 'unique_id', 'ds', first_col)\n", " return GroupedArray(data, indptr), level\n", "\n", " def predict(\n", @@ -1276,7 +1168,8 @@ " DataFrame with `models` columns for point predictions and probabilistic\n", " predictions for all fitted `models`.\n", " \"\"\"\n", - " self._prepare_fit(df=df, sort_df=sort_df, save_original=fitted) \n", + " self.__dict__.pop('fcst_fitted_values_', None)\n", + " self._prepare_fit(df=df, sort_df=sort_df)\n", " self._set_prediction_intervals(prediction_intervals=prediction_intervals)\n", " X, level = self._parse_X_level(h=h, X=X_df, level=level)\n", " if self.n_jobs == 1:\n", @@ -1311,7 +1204,10 @@ " if not hasattr(self, \"fcst_fitted_values_\"):\n", " raise Exception(\"Please run `forecast` method using `fitted=True`\")\n", " cols = self.fcst_fitted_values_[\"cols\"]\n", - " df = self.df_constructor({'unique_id': self.og_unique_id, 'ds': self.og_dates})\n", + " df = self.df_constructor({\n", + " 'unique_id': ufp.repeat(self.uids, np.diff(self.ga.indptr)),\n", + " 'ds': self.og_dates\n", + " })\n", " df[cols] = self.fcst_fitted_values_['values']\n", " if isinstance(df, pd.DataFrame):\n", " if sf_config.id_as_index:\n", @@ -1377,19 +1273,14 @@ " DataFrame with insample `models` columns for point predictions and probabilistic\n", " predictions for all fitted `models`.\n", " \"\"\"\n", - " self._prepare_fit(df=df, sort_df=sort_df, save_original=fitted) \n", + " if n_windows is None and test_size is None:\n", + " raise ValueError('you must define `n_windows` or `test_size`')\n", " if test_size is None:\n", " test_size = h + step_size * (n_windows - 1)\n", - " elif n_windows is None:\n", - " if (test_size - h) % step_size:\n", - " raise Exception('`test_size - h` should be module `step_size`')\n", - " n_windows = int((test_size - h) / step_size) + 1\n", - " elif (n_windows is None) and (test_size is None):\n", - " raise Exception('you must define `n_windows` or `test_size`')\n", - " else:\n", - " raise Exception('you must define `n_windows` or `test_size` but not both')\n", " if prediction_intervals is not None and level is None:\n", - " raise ValueError('You must specify `level` when using `prediction_intervals`') \n", + " raise ValueError('You must specify `level` when using `prediction_intervals`')\n", + " self.__dict__.pop('cv_fitted_values_', None)\n", + " self._prepare_fit(df=df, sort_df=sort_df)\n", " self._set_prediction_intervals(prediction_intervals=prediction_intervals)\n", " series_sizes = np.diff(self.ga.indptr)\n", " short_series = series_sizes <= test_size\n", @@ -1421,22 +1312,23 @@ " level=level,\n", " refit=refit\n", " )\n", - " \n", " if fitted:\n", " self.cv_fitted_values_ = res_fcsts['fitted']\n", " self.n_cv_ = n_windows\n", - "\n", - " fcsts = res_fcsts[\"forecasts\"]\n", - " cols = res_fcsts[\"cols\"] \n", - " fcsts_df: DataFrame = _cv_dates(\n", - " last_dates=self.last_dates, freq=self.freq, h=h, test_size=test_size, step_size=step_size)\n", - " fcsts_df.insert(0, 'unique_id', pd.Index(np.repeat(self.uids, h * n_windows)))\n", - " fcsts_df[cols] = fcsts\n", - " if self.df_constructor is pd.DataFrame and sf_config.id_as_index:\n", + " fcsts_df = ufp.cv_times(\n", + " times=self.og_dates,\n", + " uids=self.uids,\n", + " indptr=self.ga.indptr,\n", + " h=h,\n", + " test_size=test_size,\n", + " step_size=step_size,\n", + " )\n", + " # the cv_times is sorted by window and then id\n", + " fcsts_df = ufp.sort(fcsts_df, [\"unique_id\", \"cutoff\", \"ds\"]) \n", + " fcsts_df = ufp.assign_columns(fcsts_df, res_fcsts[\"cols\"], res_fcsts[\"forecasts\"])\n", + " if isinstance(fcsts_df, pd.DataFrame) and sf_config.id_as_index:\n", " _warn_id_as_idx()\n", " fcsts_df = fcsts_df.set_index('unique_id')\n", - " elif self.df_constructor is pl_DataFrame:\n", - " fcsts_df = pl.from_pandas(fcsts_df)\n", " return fcsts_df\n", "\n", " def cross_validation_fitted_values(self) -> DataFrame:\n", @@ -1452,26 +1344,35 @@ " fcsts_df : pandas.DataFrame | polars.DataFrame\n", " DataFrame with insample `models` columns for point predictions \n", " and probabilistic predictions for all fitted `models`.\n", - " \"\"\"\n", + " \"\"\" \n", " if not hasattr(self, 'cv_fitted_values_'):\n", - " raise Exception('Please run `cross_validation` mehtod using `fitted=True`')\n", - " ds = pd.MultiIndex.from_arrays([self.og_unique_id, self.og_dates])\n", - " index = pd.MultiIndex.from_tuples(np.tile(ds, self.n_cv_), names=['unique_id', 'ds'])\n", - " df = pd.DataFrame(index=index)\n", - " df['cutoff'] = self.cv_fitted_values_['last_idxs'].flatten(order='F')\n", - " df[self.cv_fitted_values_['cols']] = np.reshape(self.cv_fitted_values_['values'], (-1, len(self.models) + 1), order='F')\n", - " idxs = self.cv_fitted_values_['idxs'].flatten(order='F')\n", - " df = df.iloc[idxs].reset_index()\n", - " if self.df_constructor is pd.DataFrame:\n", + " raise Exception('Please run `cross_validation` method using `fitted=True`')\n", + " idxs = self.cv_fitted_values_['idxs'].flatten(order='F') \n", + " train_uids = ufp.repeat(self.uids, np.diff(self.ga.indptr))\n", + " cv_uids = ufp.vertical_concat([train_uids for _ in range(self.n_cv_)])\n", + " used_uids = ufp.take_rows(cv_uids, idxs)\n", + " dates = np.tile(self.og_dates, self.n_cv_)[idxs]\n", + " cutoffs_mask = self.cv_fitted_values_['last_idxs'].flatten(order='F')[idxs]\n", + " cutoffs_sizes = np.diff(np.append(0, np.where(cutoffs_mask)[0] + 1))\n", + " cutoffs = np.repeat(dates[cutoffs_mask], cutoffs_sizes) \n", + " df = self.df_constructor({\n", + " 'unique_id': used_uids,\n", + " 'ds': dates,\n", + " 'cutoff': cutoffs,\n", + " })\n", + " fitted_vals = np.reshape(\n", + " self.cv_fitted_values_['values'],\n", + " (-1, len(self.models) + 1),\n", + " order='F',\n", + " )\n", + " df = ufp.assign_columns(df, self.cv_fitted_values_['cols'], fitted_vals[idxs])\n", + " df = ufp.drop_index_if_pandas(df)\n", + " if isinstance(df, pd.DataFrame):\n", " if sf_config.id_as_index:\n", " _warn_id_as_idx()\n", " df = df.set_index('unique_id')\n", " else:\n", " df = df.reset_index(drop=True)\n", - " df['cutoff'] = df['ds'].where(df['cutoff']).bfill()\n", - "\n", - " if self.df_constructor is pl_DataFrame:\n", - " df = pl.from_pandas(df)\n", " return df\n", "\n", " def _get_pool(self):\n", @@ -2067,10 +1968,10 @@ "with tempfile.TemporaryDirectory() as td:\n", " f_path = Path(td).joinpath(\"sf_test.pickle\")\n", " \n", - " test_df = pl.from_pandas(panel_df.astype({\"unique_id\": str}))\n", + " test_df = generate_series(n_series=9, equal_ends=False, engine='polars')\n", " test_frcs = StatsForecast(\n", " models=models,\n", - " freq='D', \n", + " freq='1d', \n", " n_jobs=1, \n", " verbose=True\n", " )\n", @@ -2722,8 +2623,7 @@ ")\n", "test_fail(test_eq, args=(res_cv_wo_refit, res_cv))\n", "cols_wo_refit = res_cv_wo_refit.columns\n", - "test_eq(res_cv_wo_refit.groupby('unique_id').head(1), \n", - " res_cv[cols_wo_refit].groupby('unique_id').head(1))\n", + "test_eq(res_cv_wo_refit.groupby('unique_id').head(1), res_cv[cols_wo_refit].groupby('unique_id').head(1))\n", "\n", "n_windows = fcst.cross_validation(\n", " df=series_cv,\n", @@ -3235,7 +3135,7 @@ "metadata": {}, "outputs": [], "source": [ - "fcst = StatsForecast(models=[HistoricAverage()], freq='D')\n", + "fcst = StatsForecast(models=[HistoricAverage()], freq=1)\n", "horizon = 7\n", "forecast = fcst.forecast(df=int_ds_df, h=horizon)\n", "forecast.head()" @@ -3444,7 +3344,7 @@ "def test_x_vars(n_jobs=1):\n", " fcst = StatsForecast(\n", " models=[ReturnX()],\n", - " freq='M',\n", + " freq=1,\n", " n_jobs=n_jobs,\n", " )\n", " xreg = test_df.drop(columns='y')\n", @@ -3502,7 +3402,7 @@ " SeasonalNaive(season_length=12), \n", " AutoARIMA(season_length=12)\n", " ],\n", - " freq='M',\n", + " freq=1,\n", " n_jobs=1\n", ")\n", "ap_ci = sf.forecast(df=ap_df, h=12, level=(80, 95))\n", @@ -3553,7 +3453,7 @@ " alias='ConformalAutoARIMA'\n", " ),\n", " ],\n", - " freq='M',\n", + " freq=1,\n", " n_jobs=1\n", ")\n", "ap_ci = sf.forecast(df=ap_df, h=12, level=(80, 95))\n", @@ -3580,7 +3480,7 @@ " models=[\n", " AutoARIMA(season_length=12),\n", " ],\n", - " freq='M',\n", + " freq=1,\n", " n_jobs=1\n", ")\n", "ap_ci = sf.forecast(\n", @@ -3613,7 +3513,7 @@ " SeasonalNaive(season_length=12), \n", " AutoARIMA(season_length=12)\n", " ],\n", - " freq='M',\n", + " freq=1,\n", " n_jobs=n_jobs\n", " )\n", " ap_ci = fcst.forecast(df=ap_df, h=12, level=(80, 95))\n", diff --git a/settings.ini b/settings.ini index 5eb568580..ab4915588 100644 --- a/settings.ini +++ b/settings.ini @@ -15,8 +15,8 @@ language = English custom_sidebar = True license = apache2 status = 2 -requirements = numba>=0.55.0 numpy>=1.21.6 pandas>=1.3.5 scipy>=1.7.3 statsmodels>=0.13.2 tqdm fugue>=0.8.1 utilsforecast>=0.0.14 -polars_requirements = polars pyarrow +requirements = numba>=0.55.0 numpy>=1.21.6 pandas>=1.3.5 scipy>=1.7.3 statsmodels>=0.13.2 tqdm fugue>=0.8.1 utilsforecast>=0.0.17 +polars_requirements = polars ray_requirements = fugue[ray]>=0.8.1 protobuf>=3.15.3,<4.0.0 dask_requirements = fugue[dask]>=0.8.1 spark_requirements = fugue[spark]>=0.8.1 diff --git a/statsforecast/_modidx.py b/statsforecast/_modidx.py index 9adbaedc9..1d726ad4d 100644 --- a/statsforecast/_modidx.py +++ b/statsforecast/_modidx.py @@ -177,7 +177,6 @@ 'statsforecast/core.py'), 'statsforecast.core._StatsForecast.save': ( 'src/core/core.html#_statsforecast.save', 'statsforecast/core.py'), - 'statsforecast.core._cv_dates': ('src/core/core.html#_cv_dates', 'statsforecast/core.py'), 'statsforecast.core._get_n_jobs': ('src/core/core.html#_get_n_jobs', 'statsforecast/core.py'), 'statsforecast.core._maybe_warn_sort_df': ( 'src/core/core.html#_maybe_warn_sort_df', 'statsforecast/core.py'), diff --git a/statsforecast/core.py b/statsforecast/core.py index 0b68bc8b9..6d596bb6b 100644 --- a/statsforecast/core.py +++ b/statsforecast/core.py @@ -18,15 +18,15 @@ import numpy as np import pandas as pd +import utilsforecast.processing as ufp from fugue.execution.factory import ( make_execution_engine, try_get_context_execution_engine, ) from tqdm.autonotebook import tqdm from triad import conditional_dispatcher -from utilsforecast.compat import DataFrame, pl_DataFrame, pl +from utilsforecast.compat import DataFrame, pl, pl_DataFrame, pl_Series from utilsforecast.grouped_array import GroupedArray as BaseGroupedArray -from utilsforecast.processing import process_df from utilsforecast.validation import ensure_time_dtype import statsforecast.config as sf_config @@ -404,45 +404,6 @@ def split_fm(self, fm, n_chunks): ] # %% ../nbs/src/core/core.ipynb 24 -def _cv_dates(last_dates, freq, h, test_size, step_size=1): - # assuming step_size = 1 - if (test_size - h) % step_size: - raise Exception("`test_size - h` should be module `step_size`") - n_windows = int((test_size - h) / step_size) + 1 - if len(np.unique(last_dates)) == 1: - if issubclass(last_dates.dtype.type, np.integer): - total_dates = np.arange(last_dates[0] - test_size + 1, last_dates[0] + 1) - out = np.empty((h * n_windows, 2), dtype=last_dates.dtype) - freq = 1 - else: - total_dates = pd.date_range(end=last_dates[0], periods=test_size, freq=freq) - out = np.empty((h * n_windows, 2), dtype="datetime64[s]") - for i_window, cutoff in enumerate( - range(-test_size, -h + 1, step_size), start=0 - ): - end_cutoff = cutoff + h - out[h * i_window : h * (i_window + 1), 0] = ( - total_dates[cutoff:] - if end_cutoff == 0 - else total_dates[cutoff:end_cutoff] - ) - out[h * i_window : h * (i_window + 1), 1] = np.tile( - total_dates[cutoff] - freq, h - ) - dates = pd.DataFrame( - np.tile(out, (len(last_dates), 1)), columns=["ds", "cutoff"] - ) - else: - dates = pd.concat( - [ - _cv_dates(np.array([ld]), freq, h, test_size, step_size) - for ld in last_dates - ] - ) - dates = dates.reset_index(drop=True) - return dates - -# %% ../nbs/src/core/core.ipynb 28 def _get_n_jobs(n_groups, n_jobs): if n_jobs == -1 or (n_jobs is None): actual_n_jobs = cpu_count() @@ -450,7 +411,7 @@ def _get_n_jobs(n_groups, n_jobs): actual_n_jobs = n_jobs return min(n_groups, actual_n_jobs) -# %% ../nbs/src/core/core.ipynb 31 +# %% ../nbs/src/core/core.ipynb 27 def _warn_df_constructor(): warnings.warn( "The `df` argument of the StatsForecast constructor is deprecated " @@ -477,7 +438,7 @@ def _warn_id_as_idx(): category=DeprecationWarning, ) -# %% ../nbs/src/core/core.ipynb 32 +# %% ../nbs/src/core/core.ipynb 28 class _StatsForecast: def __init__( self, @@ -526,7 +487,7 @@ def __init__( # TODO @fede: needed for residuals, think about it later self.models = models self._validate_model_names() - self.freq = pd.tseries.frequencies.to_offset(freq) + self.freq = freq self.n_jobs = n_jobs self.fallback_model = fallback_model self.verbose = verbose @@ -547,7 +508,9 @@ def _validate_model_names(self): ) def _prepare_fit( - self, df: Optional[DataFrame], sort_df: bool = True, save_original: bool = False + self, + df: Optional[DataFrame], + sort_df: bool = True, ) -> None: if df is None: _warn_df_constructor() @@ -562,21 +525,17 @@ def _prepare_fit( ) df = df.reset_index() _maybe_warn_sort_df(sort_df) - self.uids, self.last_dates, data, indptr, sort_idxs = process_df( + self.uids, last_times, data, indptr, sort_idxs = ufp.process_df( df, "unique_id", "ds", "y" ) if isinstance(df, pd.DataFrame): - self.last_dates = pd.Index(self.last_dates, name="ds") + self.last_dates = pd.Index(last_times, name="ds") + else: + self.last_dates = pl_Series(last_times) self.ga = GroupedArray(data, indptr) - if save_original: - self.og_unique_id = df["unique_id"] - self.og_dates = df["ds"].to_numpy() - if sort_idxs is not None: - if isinstance(self.og_unique_id, pd.Series): - self.og_unique_id = self.og_unique_id.iloc[sort_idxs] - else: - self.og_unique_id = self.og_unique_id.take(sort_idxs) - self.og_dates = self.og_dates[sort_idxs] + self.og_dates = df["ds"].to_numpy() + if sort_idxs is not None: + self.og_dates = self.og_dates[sort_idxs] self.n_jobs = _get_n_jobs(len(self.ga), self.n_jobs) self.df_constructor = type(df) @@ -624,22 +583,9 @@ def fit( return self def _make_future_df(self, h: int): - if issubclass(self.last_dates.dtype.type, np.integer): - last_date_f = lambda x: np.arange( - x + 1, x + 1 + h, dtype=self.last_dates.dtype - ) - else: - last_date_f = lambda x: pd.date_range( - x + self.freq, periods=h, freq=self.freq - ) - if len(np.unique(self.last_dates)) == 1: - dates = np.tile(last_date_f(self.last_dates[0]), len(self.ga)) - else: - dates = np.hstack([last_date_f(last_date) for last_date in self.last_dates]) - - uids = np.repeat(self.uids, h) - if self.df_constructor is pl_DataFrame and uids.dtype.kind == "O": - uids = uids.astype(str) + start_dates = ufp.offset_times(self.last_dates, freq=self.freq, n=1) + dates = ufp.time_ranges(start_dates, freq=self.freq, periods=h) + uids = ufp.repeat(self.uids, n=h) df = self.df_constructor({"unique_id": uids, "ds": dates}) if isinstance(df, pd.DataFrame): if sf_config.id_as_index: @@ -647,8 +593,6 @@ def _make_future_df(self, h: int): df = df.set_index("unique_id") else: df = df.reset_index(drop=True) - else: - df = df.with_columns(pl.col("unique_id").cast(self.uids.dtype)) return df def _parse_X_level( @@ -664,7 +608,7 @@ def _parse_X_level( f"Expected X to have shape {expected_shape}, but got {X.shape}" ) first_col = [c for c in X.columns if c not in ("unique_id", "ds")][0] - _, _, data, indptr, _ = process_df(X, "unique_id", "ds", first_col) + _, _, data, indptr, _ = ufp.process_df(X, "unique_id", "ds", first_col) return GroupedArray(data, indptr), level def predict( @@ -813,7 +757,8 @@ def forecast( DataFrame with `models` columns for point predictions and probabilistic predictions for all fitted `models`. """ - self._prepare_fit(df=df, sort_df=sort_df, save_original=fitted) + self.__dict__.pop("fcst_fitted_values_", None) + self._prepare_fit(df=df, sort_df=sort_df) self._set_prediction_intervals(prediction_intervals=prediction_intervals) X, level = self._parse_X_level(h=h, X=X_df, level=level) if self.n_jobs == 1: @@ -853,7 +798,12 @@ def forecast_fitted_values(self): if not hasattr(self, "fcst_fitted_values_"): raise Exception("Please run `forecast` method using `fitted=True`") cols = self.fcst_fitted_values_["cols"] - df = self.df_constructor({"unique_id": self.og_unique_id, "ds": self.og_dates}) + df = self.df_constructor( + { + "unique_id": ufp.repeat(self.uids, np.diff(self.ga.indptr)), + "ds": self.og_dates, + } + ) df[cols] = self.fcst_fitted_values_["values"] if isinstance(df, pd.DataFrame): if sf_config.id_as_index: @@ -919,21 +869,16 @@ def cross_validation( DataFrame with insample `models` columns for point predictions and probabilistic predictions for all fitted `models`. """ - self._prepare_fit(df=df, sort_df=sort_df, save_original=fitted) + if n_windows is None and test_size is None: + raise ValueError("you must define `n_windows` or `test_size`") if test_size is None: test_size = h + step_size * (n_windows - 1) - elif n_windows is None: - if (test_size - h) % step_size: - raise Exception("`test_size - h` should be module `step_size`") - n_windows = int((test_size - h) / step_size) + 1 - elif (n_windows is None) and (test_size is None): - raise Exception("you must define `n_windows` or `test_size`") - else: - raise Exception("you must define `n_windows` or `test_size` but not both") if prediction_intervals is not None and level is None: raise ValueError( "You must specify `level` when using `prediction_intervals`" ) + self.__dict__.pop("cv_fitted_values_", None) + self._prepare_fit(df=df, sort_df=sort_df) self._set_prediction_intervals(prediction_intervals=prediction_intervals) series_sizes = np.diff(self.ga.indptr) short_series = series_sizes <= test_size @@ -967,27 +912,25 @@ def cross_validation( level=level, refit=refit, ) - if fitted: self.cv_fitted_values_ = res_fcsts["fitted"] self.n_cv_ = n_windows - - fcsts = res_fcsts["forecasts"] - cols = res_fcsts["cols"] - fcsts_df: DataFrame = _cv_dates( - last_dates=self.last_dates, - freq=self.freq, + fcsts_df = ufp.cv_times( + times=self.og_dates, + uids=self.uids, + indptr=self.ga.indptr, h=h, test_size=test_size, step_size=step_size, ) - fcsts_df.insert(0, "unique_id", pd.Index(np.repeat(self.uids, h * n_windows))) - fcsts_df[cols] = fcsts - if self.df_constructor is pd.DataFrame and sf_config.id_as_index: + # the cv_times is sorted by window and then id + fcsts_df = ufp.sort(fcsts_df, ["unique_id", "cutoff", "ds"]) + fcsts_df = ufp.assign_columns( + fcsts_df, res_fcsts["cols"], res_fcsts["forecasts"] + ) + if isinstance(fcsts_df, pd.DataFrame) and sf_config.id_as_index: _warn_id_as_idx() fcsts_df = fcsts_df.set_index("unique_id") - elif self.df_constructor is pl_DataFrame: - fcsts_df = pl.from_pandas(fcsts_df) return fcsts_df def cross_validation_fitted_values(self) -> DataFrame: @@ -1005,28 +948,35 @@ def cross_validation_fitted_values(self) -> DataFrame: and probabilistic predictions for all fitted `models`. """ if not hasattr(self, "cv_fitted_values_"): - raise Exception("Please run `cross_validation` mehtod using `fitted=True`") - ds = pd.MultiIndex.from_arrays([self.og_unique_id, self.og_dates]) - index = pd.MultiIndex.from_tuples( - np.tile(ds, self.n_cv_), names=["unique_id", "ds"] + raise Exception("Please run `cross_validation` method using `fitted=True`") + idxs = self.cv_fitted_values_["idxs"].flatten(order="F") + train_uids = ufp.repeat(self.uids, np.diff(self.ga.indptr)) + cv_uids = ufp.vertical_concat([train_uids for _ in range(self.n_cv_)]) + used_uids = ufp.take_rows(cv_uids, idxs) + dates = np.tile(self.og_dates, self.n_cv_)[idxs] + cutoffs_mask = self.cv_fitted_values_["last_idxs"].flatten(order="F")[idxs] + cutoffs_sizes = np.diff(np.append(0, np.where(cutoffs_mask)[0] + 1)) + cutoffs = np.repeat(dates[cutoffs_mask], cutoffs_sizes) + df = self.df_constructor( + { + "unique_id": used_uids, + "ds": dates, + "cutoff": cutoffs, + } ) - df = pd.DataFrame(index=index) - df["cutoff"] = self.cv_fitted_values_["last_idxs"].flatten(order="F") - df[self.cv_fitted_values_["cols"]] = np.reshape( - self.cv_fitted_values_["values"], (-1, len(self.models) + 1), order="F" + fitted_vals = np.reshape( + self.cv_fitted_values_["values"], + (-1, len(self.models) + 1), + order="F", ) - idxs = self.cv_fitted_values_["idxs"].flatten(order="F") - df = df.iloc[idxs].reset_index() - if self.df_constructor is pd.DataFrame: + df = ufp.assign_columns(df, self.cv_fitted_values_["cols"], fitted_vals[idxs]) + df = ufp.drop_index_if_pandas(df) + if isinstance(df, pd.DataFrame): if sf_config.id_as_index: _warn_id_as_idx() df = df.set_index("unique_id") else: df = df.reset_index(drop=True) - df["cutoff"] = df["ds"].where(df["cutoff"]).bfill() - - if self.df_constructor is pl_DataFrame: - df = pl.from_pandas(df) return df def _get_pool(self): @@ -1373,7 +1323,7 @@ def load(path: Union[Path, str]): def __repr__(self): return f"StatsForecast(models=[{','.join(map(repr, self.models))}])" -# %% ../nbs/src/core/core.ipynb 33 +# %% ../nbs/src/core/core.ipynb 29 class ParallelBackend: def forecast(self, df, models, freq, fallback_model=None, **kwargs: Any) -> Any: model = _StatsForecast( @@ -1394,7 +1344,7 @@ def cross_validation( def make_backend(obj: Any, *args: Any, **kwargs: Any) -> ParallelBackend: return ParallelBackend() -# %% ../nbs/src/core/core.ipynb 34 +# %% ../nbs/src/core/core.ipynb 30 class StatsForecast(_StatsForecast): """Train statistical models. From 10b5f2544979d682ae40ba02901f30d8134f4ff9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Morales?= Date: Thu, 23 Nov 2023 16:31:48 -0600 Subject: [PATCH 2/2] add error messages for bad frequencies --- .../3_Getting_Started_complete_polars.ipynb} | 2 +- nbs/src/core/core.ipynb | 47 ++++++++++++++----- statsforecast/core.py | 45 +++++++++++++----- 3 files changed, 68 insertions(+), 26 deletions(-) rename nbs/docs/{how-to-guides/Getting_Started_complete_polars.ipynb => getting-started/3_Getting_Started_complete_polars.ipynb} (99%) diff --git a/nbs/docs/how-to-guides/Getting_Started_complete_polars.ipynb b/nbs/docs/getting-started/3_Getting_Started_complete_polars.ipynb similarity index 99% rename from nbs/docs/how-to-guides/Getting_Started_complete_polars.ipynb rename to nbs/docs/getting-started/3_Getting_Started_complete_polars.ipynb index 17c4fe1fa..2bfe3c04e 100644 --- a/nbs/docs/how-to-guides/Getting_Started_complete_polars.ipynb +++ b/nbs/docs/getting-started/3_Getting_Started_complete_polars.ipynb @@ -343,7 +343,7 @@ "sf = StatsForecast(\n", " df=Y_df, \n", " models=models,\n", - " freq='H', \n", + " freq=1, \n", " n_jobs=-1,\n", " fallback_model = SeasonalNaive(season_length=7),\n", " verbose=True\n", diff --git a/nbs/src/core/core.ipynb b/nbs/src/core/core.ipynb index 3514ad15e..5748b84ff 100644 --- a/nbs/src/core/core.ipynb +++ b/nbs/src/core/core.ipynb @@ -99,7 +99,7 @@ "from fugue.execution.factory import make_execution_engine, try_get_context_execution_engine\n", "from tqdm.autonotebook import tqdm\n", "from triad import conditional_dispatcher\n", - "from utilsforecast.compat import DataFrame, pl, pl_DataFrame, pl_Series\n", + "from utilsforecast.compat import DataFrame, pl_DataFrame, pl_Series\n", "from utilsforecast.grouped_array import GroupedArray as BaseGroupedArray\n", "from utilsforecast.validation import ensure_time_dtype\n", "\n", @@ -874,7 +874,7 @@ " def __init__(\n", " self, \n", " models: List[Any],\n", - " freq: str,\n", + " freq: Union[str, int],\n", " n_jobs: int = 1,\n", " df: Optional[DataFrame] = None,\n", " sort_df: bool = True,\n", @@ -899,9 +899,8 @@ " ----------\n", " models : List[Any]\n", " List of instantiated objects models.StatsForecast.\n", - " freq : str\n", - " Frequency of the data.\n", - " See [pandas' available frequencies](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases).\n", + " freq : str or int.\n", + " Frequency of the data. Must be a valid pandas or polars offset alias.\n", " n_jobs : int (default=1)\n", " Number of jobs used in the parallel processing, use -1 for all cores.\n", " df : pandas.DataFrame or pl.DataFrame, optional (default=None)\n", @@ -944,15 +943,37 @@ " if df is None:\n", " _warn_df_constructor()\n", " return\n", - " df = ensure_time_dtype(df, 'ds') \n", - " if isinstance(df, pd.DataFrame):\n", - " if df.index.name == 'unique_id':\n", - " warnings.warn(\n", - " \"Passing unique_id as the index is deprecated. \"\n", - " \"Please provide it as a column instead.\",\n", - " category=DeprecationWarning\n", + " df = ensure_time_dtype(df, 'ds')\n", + " time_dtype = df['ds'].head(1).to_numpy().dtype\n", + " time_is_int = np.issubdtype(time_dtype, np.integer)\n", + " if time_is_int and not isinstance(self.freq, int):\n", + " raise ValueError(\n", + " 'Time column contains integer but the specified frequency is not an integer. '\n", + " 'Please provide a valid integer, like `freq=1`'\n", + " )\n", + " elif not time_is_int and not isinstance(self.freq, str):\n", + " # the ensure_time_dtype function makes sure that ds is either int or timestamp\n", + " raise ValueError(\n", + " 'Time column contains timestamps but the specified frequency is an integer. '\n", + " 'Please provide a valid pandas or polars offset.'\n", + " )\n", + " # try to catch pandas frequency in polars dataframe\n", + " if isinstance(df, pl_DataFrame) and isinstance(self.freq, str):\n", + " missing_n = re.search(r'\\d+', self.freq) is None\n", + " uppercase = re.sub('\\d+', '', self.freq).isupper()\n", + " if missing_n or uppercase:\n", + " raise ValueError(\n", + " 'You must specify a valid polars offset when using polars dataframes. '\n", + " 'You can find the available offsets in '\n", + " 'https://pola-rs.github.io/polars/py-polars/html/reference/expressions/api/polars.Expr.dt.offset_by.html'\n", " )\n", - " df = df.reset_index()\n", + " elif isinstance(df, pd.DataFrame) and df.index.name == 'unique_id':\n", + " warnings.warn(\n", + " \"Passing unique_id as the index is deprecated. \"\n", + " \"Please provide it as a column instead.\",\n", + " category=DeprecationWarning\n", + " )\n", + " df = df.reset_index()\n", " _maybe_warn_sort_df(sort_df)\n", " self.uids, last_times, data, indptr, sort_idxs = ufp.process_df(\n", " df, 'unique_id', 'ds', 'y'\n", diff --git a/statsforecast/core.py b/statsforecast/core.py index 6d596bb6b..05a93aa40 100644 --- a/statsforecast/core.py +++ b/statsforecast/core.py @@ -25,7 +25,7 @@ ) from tqdm.autonotebook import tqdm from triad import conditional_dispatcher -from utilsforecast.compat import DataFrame, pl, pl_DataFrame, pl_Series +from utilsforecast.compat import DataFrame, pl_DataFrame, pl_Series from utilsforecast.grouped_array import GroupedArray as BaseGroupedArray from utilsforecast.validation import ensure_time_dtype @@ -443,7 +443,7 @@ class _StatsForecast: def __init__( self, models: List[Any], - freq: str, + freq: Union[str, int], n_jobs: int = 1, df: Optional[DataFrame] = None, sort_df: bool = True, @@ -468,9 +468,8 @@ def __init__( ---------- models : List[Any] List of instantiated objects models.StatsForecast. - freq : str - Frequency of the data. - See [pandas' available frequencies](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases). + freq : str or int. + Frequency of the data. Must be a valid pandas or polars offset alias. n_jobs : int (default=1) Number of jobs used in the parallel processing, use -1 for all cores. df : pandas.DataFrame or pl.DataFrame, optional (default=None) @@ -516,14 +515,36 @@ def _prepare_fit( _warn_df_constructor() return df = ensure_time_dtype(df, "ds") - if isinstance(df, pd.DataFrame): - if df.index.name == "unique_id": - warnings.warn( - "Passing unique_id as the index is deprecated. " - "Please provide it as a column instead.", - category=DeprecationWarning, + time_dtype = df["ds"].head(1).to_numpy().dtype + time_is_int = np.issubdtype(time_dtype, np.integer) + if time_is_int and not isinstance(self.freq, int): + raise ValueError( + "Time column contains integer but the specified frequency is not an integer. " + "Please provide a valid integer, like `freq=1`" + ) + elif not time_is_int and not isinstance(self.freq, str): + # the ensure_time_dtype function makes sure that ds is either int or timestamp + raise ValueError( + "Time column contains timestamps but the specified frequency is an integer. " + "Please provide a valid pandas or polars offset." + ) + # try to catch pandas frequency in polars dataframe + if isinstance(df, pl_DataFrame) and isinstance(self.freq, str): + missing_n = re.search(r"\d+", self.freq) is None + uppercase = re.sub("\d+", "", self.freq).isupper() + if missing_n or uppercase: + raise ValueError( + "You must specify a valid polars offset when using polars dataframes. " + "You can find the available offsets in " + "https://pola-rs.github.io/polars/py-polars/html/reference/expressions/api/polars.Expr.dt.offset_by.html" ) - df = df.reset_index() + elif isinstance(df, pd.DataFrame) and df.index.name == "unique_id": + warnings.warn( + "Passing unique_id as the index is deprecated. " + "Please provide it as a column instead.", + category=DeprecationWarning, + ) + df = df.reset_index() _maybe_warn_sort_df(sort_df) self.uids, last_times, data, indptr, sort_idxs = ufp.process_df( df, "unique_id", "ds", "y"