Skip to content

Commit

Permalink
Merge pull request #280 from Nixtla/feat/progress-bar
Browse files Browse the repository at this point in the history
[FEAT] Add progress bar for sequential tasks
  • Loading branch information
AzulGarza authored Oct 25, 2022
2 parents fecbfc3 + c2e915c commit fb545cb
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 20 deletions.
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ dependencies:
- tabulate
- pip:
- nbdev
- tqdm
44 changes: 32 additions & 12 deletions nbs/core.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@
"from typing import Any, List, Optional\n",
"\n",
"import numpy as np\n",
"import pandas as pd"
"import pandas as pd\n",
"from tqdm.autonotebook import tqdm"
]
},
{
Expand Down Expand Up @@ -206,7 +207,7 @@
" fcsts, cols = self.predict(fm=fm, h=h, X=X, level=level)\n",
" return fm, fcsts, cols\n",
" \n",
" def forecast(self, models, h, fallback_model=None, fitted=False, X=None, level=tuple()):\n",
" def forecast(self, models, h, fallback_model=None, fitted=False, X=None, level=tuple(), verbose=False):\n",
" fcsts, cuts, has_level_models = self._output_fcst(\n",
" models=models, attr='forecast', \n",
" h=h, X=X, level=level\n",
Expand All @@ -221,7 +222,11 @@
" fitted_vals[:, 0] = self.data\n",
" else:\n",
" fitted_vals[:, 0] = self.data[:, 0]\n",
" for i, grp in enumerate(self):\n",
" iterable = tqdm(enumerate(self), \n",
" disable=(not verbose), \n",
" total=len(self),\n",
" desc='Forecast')\n",
" for i, grp in iterable:\n",
" y_train = grp[:, 0] if grp.ndim == 2 else grp\n",
" X_train = grp[:, 1:] if (grp.ndim == 2 and grp.shape[1] > 1) else None\n",
" if X is not None:\n",
Expand Down Expand Up @@ -263,7 +268,8 @@
" result['fitted']['cols'] = ['y'] + cols_fitted\n",
" return result\n",
" \n",
" def cross_validation(self, models, h, test_size, step_size=1, input_size=None, fitted=False, level=tuple()):\n",
" def cross_validation(self, models, h, test_size, step_size=1, input_size=None, fitted=False, level=tuple(), \n",
" verbose=False):\n",
" # output of size: (ts, window, h)\n",
" if (test_size - h) % step_size:\n",
" raise Exception('`test_size - h` should be module `step_size`')\n",
Expand All @@ -277,8 +283,13 @@
" fitted_idxs = np.full((self.data.shape[0], n_windows), False, dtype=bool)\n",
" last_fitted_idxs = np.full_like(fitted_idxs, False, dtype=bool)\n",
" matches = ['mean', 'lo', 'hi']\n",
" steps = list(range(-test_size, -h + 1, step_size))\n",
" for i_ts, grp in enumerate(self):\n",
" for i_window, cutoff in enumerate(range(-test_size, -h + 1, step_size), start=0):\n",
" iterable = tqdm(enumerate(steps, start=0), \n",
" desc=f'Cross Validation Time Series {i_ts + 1}', \n",
" disable=(not verbose),\n",
" total=len(steps))\n",
" for i_window, cutoff in iterable:\n",
" end_cutoff = cutoff + h\n",
" in_size_disp = cutoff if input_size is None else input_size \n",
" y = grp[(cutoff - in_size_disp):cutoff]\n",
Expand Down Expand Up @@ -793,7 +804,8 @@
" ray_address: Optional[str] = None,\n",
" df: Optional[pd.DataFrame] = None,\n",
" sort_df: bool = True,\n",
" fallback_model: Any = None\n",
" fallback_model: Any = None,\n",
" verbose: bool = False\n",
" ):\n",
" \"\"\"core.StatsForecast.\n",
" [Source code](https://github.com/Nixtla/statsforecast/blob/main/statsforecast/core.py).\n",
Expand All @@ -814,6 +826,7 @@
" `n_jobs`: int, number of jobs used in the parallel processing, use -1 for all cores.<br>\n",
" `sort_df`: bool, if True, sort `df` by [`unique_id`,`ds`].<br>\n",
" `fallback_model`: Any, Model to be used if a model fails. Only works with the `forecast` method.<br>\n",
" `verbose`: bool, Prints TQDM progress bar when `n_jobs=1`.<br>\n",
"\n",
" **Notes:**<br>\n",
" The `core.StatsForecast` class offers parallelization utilities with Dask, Spark and Ray back-ends.<br>\n",
Expand All @@ -826,6 +839,7 @@
" self.ray_address = ray_address\n",
" self.fallback_model = fallback_model\n",
" self._prepare_fit(df=df, sort_df=sort_df)\n",
" self.verbose = verbose and self.n_jobs == 1\n",
" \n",
" def _prepare_fit(self, df, sort_df):\n",
" if df is not None:\n",
Expand Down Expand Up @@ -984,7 +998,8 @@
" if self.n_jobs == 1:\n",
" res_fcsts = self.ga.forecast(models=self.models, \n",
" h=h, fallback_model=self.fallback_model, \n",
" fitted=fitted, X=X, level=level)\n",
" fitted=fitted, X=X, level=level, \n",
" verbose=self.verbose)\n",
" else:\n",
" res_fcsts = self._forecast_parallel(h=h, fitted=fitted, X=X, level=level)\n",
" if fitted:\n",
Expand Down Expand Up @@ -1074,7 +1089,8 @@
" step_size=step_size, \n",
" input_size=input_size, \n",
" fitted=fitted,\n",
" level=level\n",
" level=level,\n",
" verbose=self.verbose\n",
" )\n",
" else:\n",
" res_fcsts = self._cross_validation_parallel(\n",
Expand Down Expand Up @@ -1315,7 +1331,9 @@
"# Instantiate StatsForecast class\n",
"fcst = StatsForecast(df=panel_df,\n",
" models=models,\n",
" freq='D', n_jobs=1)\n",
" freq='D', \n",
" n_jobs=1, \n",
" verbose=True)\n",
"\n",
"# Efficiently predict\n",
"fcsts_df = fcst.forecast(h=4, fitted=True)\n",
Expand Down Expand Up @@ -1390,7 +1408,8 @@
" df=series,\n",
" models=models,\n",
" freq='D',\n",
" n_jobs=1\n",
" n_jobs=1,\n",
" verbose=True\n",
")\n",
"\n",
"res = fcst.forecast(h=14)\n",
Expand Down Expand Up @@ -1582,7 +1601,7 @@
"# Instantiate StatsForecast class\n",
"fcst = StatsForecast(df=panel_df,\n",
" models=[Naive()],\n",
" freq='D', n_jobs=1)\n",
" freq='D', n_jobs=1, verbose=True)\n",
"\n",
"# Access insample predictions\n",
"rolled_fcsts_df = fcst.cross_validation(14, n_windows=2)\n",
Expand Down Expand Up @@ -1613,7 +1632,8 @@
"fcst = StatsForecast(\n",
" df=series_cv,\n",
" models=[SumAhead(), Naive()],\n",
" freq='D'\n",
" freq='D',\n",
" verbose=True\n",
")\n",
"res_cv = fcst.cross_validation(h=2, test_size=5, n_windows=None, level=(50, 60))\n",
"test_eq(0., np.mean(res_cv['y'] - res_cv['SumAhead']))\n",
Expand Down
2 changes: 1 addition & 1 deletion settings.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ language = English
custom_sidebar = True
license = apache2
status = 2
requirements = numba>=0.55.0 numpy>=1.21.6 pandas>=1.3.5 scipy>=1.7.3 statsmodels>=0.13.2
requirements = numba>=0.55.0 numpy>=1.21.6 pandas>=1.3.5 scipy>=1.7.3 statsmodels>=0.13.2 tqdm
ray_requirements = ray protobuf>=3.15.3,<4.0.0
fugue_requirements = fugue[ray]>=0.7.0
dev_requirements = nbdev black mypy flake8 ray protobuf>=3.15.3,<4.0.0 fugue>=0.7.0 matplotlib neuralforecast pmdarima prophet sklearn dask[distributed]
Expand Down
30 changes: 23 additions & 7 deletions statsforecast/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import numpy as np
import pandas as pd
from tqdm.autonotebook import tqdm

# %% ../nbs/core.ipynb 5
logging.basicConfig(
Expand Down Expand Up @@ -116,7 +117,7 @@ def fit_predict(self, models, h, X=None, level=tuple()):
fcsts, cols = self.predict(fm=fm, h=h, X=X, level=level)
return fm, fcsts, cols

def forecast(self, models, h, fallback_model=None, fitted=False, X=None, level=tuple()):
def forecast(self, models, h, fallback_model=None, fitted=False, X=None, level=tuple(), verbose=False):
fcsts, cuts, has_level_models = self._output_fcst(
models=models, attr='forecast',
h=h, X=X, level=level
Expand All @@ -131,7 +132,11 @@ def forecast(self, models, h, fallback_model=None, fitted=False, X=None, level=t
fitted_vals[:, 0] = self.data
else:
fitted_vals[:, 0] = self.data[:, 0]
for i, grp in enumerate(self):
iterable = tqdm(enumerate(self),
disable=(not verbose),
total=len(self),
desc='Forecast')
for i, grp in iterable:
y_train = grp[:, 0] if grp.ndim == 2 else grp
X_train = grp[:, 1:] if (grp.ndim == 2 and grp.shape[1] > 1) else None
if X is not None:
Expand Down Expand Up @@ -173,7 +178,8 @@ def forecast(self, models, h, fallback_model=None, fitted=False, X=None, level=t
result['fitted']['cols'] = ['y'] + cols_fitted
return result

def cross_validation(self, models, h, test_size, step_size=1, input_size=None, fitted=False, level=tuple()):
def cross_validation(self, models, h, test_size, step_size=1, input_size=None, fitted=False, level=tuple(),
verbose=False):
# output of size: (ts, window, h)
if (test_size - h) % step_size:
raise Exception('`test_size - h` should be module `step_size`')
Expand All @@ -187,8 +193,13 @@ def cross_validation(self, models, h, test_size, step_size=1, input_size=None, f
fitted_idxs = np.full((self.data.shape[0], n_windows), False, dtype=bool)
last_fitted_idxs = np.full_like(fitted_idxs, False, dtype=bool)
matches = ['mean', 'lo', 'hi']
steps = list(range(-test_size, -h + 1, step_size))
for i_ts, grp in enumerate(self):
for i_window, cutoff in enumerate(range(-test_size, -h + 1, step_size), start=0):
iterable = tqdm(enumerate(steps, start=0),
desc=f'Cross Validation Time Series {i_ts + 1}',
disable=(not verbose),
total=len(steps))
for i_window, cutoff in iterable:
end_cutoff = cutoff + h
in_size_disp = cutoff if input_size is None else input_size
y = grp[(cutoff - in_size_disp):cutoff]
Expand Down Expand Up @@ -313,7 +324,8 @@ def __init__(
ray_address: Optional[str] = None,
df: Optional[pd.DataFrame] = None,
sort_df: bool = True,
fallback_model: Any = None
fallback_model: Any = None,
verbose: bool = False
):
"""core.StatsForecast.
[Source code](https://github.com/Nixtla/statsforecast/blob/main/statsforecast/core.py).
Expand All @@ -334,6 +346,7 @@ def __init__(
`n_jobs`: int, number of jobs used in the parallel processing, use -1 for all cores.<br>
`sort_df`: bool, if True, sort `df` by [`unique_id`,`ds`].<br>
`fallback_model`: Any, Model to be used if a model fails. Only works with the `forecast` method.<br>
`verbose`: bool, Prints TQDM progress bar when `n_jobs=1`.<br>
**Notes:**<br>
The `core.StatsForecast` class offers parallelization utilities with Dask, Spark and Ray back-ends.<br>
Expand All @@ -346,6 +359,7 @@ def __init__(
self.ray_address = ray_address
self.fallback_model = fallback_model
self._prepare_fit(df=df, sort_df=sort_df)
self.verbose = verbose and self.n_jobs == 1

def _prepare_fit(self, df, sort_df):
if df is not None:
Expand Down Expand Up @@ -504,7 +518,8 @@ def forecast(
if self.n_jobs == 1:
res_fcsts = self.ga.forecast(models=self.models,
h=h, fallback_model=self.fallback_model,
fitted=fitted, X=X, level=level)
fitted=fitted, X=X, level=level,
verbose=self.verbose)
else:
res_fcsts = self._forecast_parallel(h=h, fitted=fitted, X=X, level=level)
if fitted:
Expand Down Expand Up @@ -594,7 +609,8 @@ def cross_validation(
step_size=step_size,
input_size=input_size,
fitted=fitted,
level=level
level=level,
verbose=self.verbose
)
else:
res_fcsts = self._cross_validation_parallel(
Expand Down

0 comments on commit fb545cb

Please sign in to comment.