diff --git a/docs/examples/004_parallel_processing.nblink b/docs/examples/004_parallel_processing.nblink new file mode 100644 index 00000000..2f3df943 --- /dev/null +++ b/docs/examples/004_parallel_processing.nblink @@ -0,0 +1,3 @@ +{ + "path": "../../examples/notebooks/ex04_parallel_processing.ipynb" +} diff --git a/docs/index.rst b/docs/index.rst index 33b1378d..65dc50fc 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -14,7 +14,7 @@ An small example using pastastore:: import pandas as pd # initialize a connector and a pastastore - pstore = pst.PastaStore(pst.PasConnector("./path_to_folder", name="my_dbase")) + pstore = pst.PastaStore(pst.PasConnector("my_dbase", "./path_to_folder")) # read some data series = pd.read_csv("some.csv", index_col=[0], parse_dates=True) diff --git a/examples/notebooks/ex04_parallel_processing.ipynb b/examples/notebooks/ex04_parallel_processing.ipynb new file mode 100644 index 00000000..a30c765a --- /dev/null +++ b/examples/notebooks/ex04_parallel_processing.ipynb @@ -0,0 +1,1168 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Parallel processing with Pastastore\n", + "\n", + "This notebook shows parallel processing capabilities of `PastaStore`.\n", + "\n", + "\n", + "
\n", + "\n", + "Note \n", + "\n", + "Parallel processing is platform dependent and may not\n", + "always work. The current implementation works well for Linux users, though this\n", + "will likely change with Python 3.13 and higher. For Windows users, parallel\n", + "solving does not work when called directly from Jupyter Notebooks or IPython.\n", + "To use parallel solving on Windows, the following code should be used in a\n", + "Python file. \n", + "\n", + "
\n", + "\n", + "```python\n", + "from multiprocessing import freeze_support\n", + "\n", + "if __name__ == \"__main__\":\n", + " freeze_support()\n", + " pstore.apply(\"models\", some_func, parallel=True)\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Pastastore version : 1.7.2\n", + "\n", + "Python version : 3.11.10\n", + "Pandas version : 2.2.2\n", + "Matplotlib version : 3.9.2\n", + "Pastas version : 1.7.0\n", + "PyYAML version : 6.0.2\n", + "\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "DeprecationWarning: As of Pastas 1.5, no noisemodel is added to the pastas Model class by default anymore. To solve your model using a noisemodel, you have to explicitly add a noisemodel to your model before solving. For more information, and how to adapt your code, please see this issue on GitHub: https://github.com/pastas/pastas/issues/735" + ] + } + ], + "source": [ + "import pastas as ps\n", + "\n", + "import pastastore as pst\n", + "from pastastore.datasets import example_pastastore\n", + "\n", + "ps.logger.setLevel(\"ERROR\") # silence Pastas logger for this notebook\n", + "pst.show_versions()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Example pastastore\n", + "\n", + "Load some example data, create models and solve them to showcase parallel processing." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "PasConnector: library 'oseries' created in '/home/david/github/pastastore/examples/notebooks/temp/my_connector/oseries'\n", + "PasConnector: library 'stresses' created in '/home/david/github/pastastore/examples/notebooks/temp/my_connector/stresses'\n", + "PasConnector: library 'models' created in '/home/david/github/pastastore/examples/notebooks/temp/my_connector/models'\n", + "PasConnector: library 'oseries_models' created in '/home/david/github/pastastore/examples/notebooks/temp/my_connector/oseries_models'\n" + ] + }, + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "b7f54a693c964594b341959f50d660a5", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "Bulk creation models: 0%| | 0/5 [00:00 float:\n", + " \"\"\"Compute the R-squared value of a Pastas model.\"\"\"\n", + " ml = pstore.get_models(model_name)\n", + " return ml.stats.rsq()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can apply this function to all models in the pastastore using `pstore.apply()`. \n", + "By default this function is run sequentially. " + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "a5b74aa7f5434dbd9ba84add1b086779", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "Applying rsq: 0%| | 0/5 [00:00\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
rsqmae
head_mw0.1593520.631499
head_nb50.4381290.318361
oseries20.9318830.087070
oseries10.9044800.091339
oseries30.0304680.106254
\n", + "" + ], + "text/plain": [ + " rsq mae\n", + "head_mw 0.159352 0.631499\n", + "head_nb5 0.438129 0.318361\n", + "oseries2 0.931883 0.087070\n", + "oseries1 0.904480 0.091339\n", + "oseries3 0.030468 0.106254" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "pstore.get_statistics([\"rsq\", \"mae\"])" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
rsqmae
_get_statistics
head_mw0.1593520.631499
head_nb50.4381290.318361
oseries20.9318830.087070
oseries10.9044800.091339
oseries30.0304680.106254
\n", + "
" + ], + "text/plain": [ + " rsq mae\n", + "_get_statistics \n", + "head_mw 0.159352 0.631499\n", + "head_nb5 0.438129 0.318361\n", + "oseries2 0.931883 0.087070\n", + "oseries1 0.904480 0.091339\n", + "oseries3 0.030468 0.106254" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "pstore.get_statistics([\"rsq\", \"mae\"], parallel=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Compute prediction intervals\n", + "\n", + "Let's try using a more complex function and passing that to apply to use\n", + "parallel processing. In this case we want to compute the prediction interval,\n", + "and pass along the $\\alpha$ value via the keyword arguments." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "def prediction_interval(model_name, **kwargs):\n", + " \"\"\"Compute the prediction interval for a Pastas model.\"\"\"\n", + " ml = pstore.get_models(model_name)\n", + " return ml.solver.prediction_interval(**kwargs)" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "60510e22549e46f5843177172e58899c", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "Applying prediction_interval: 0%| | 0/5 [00:00\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
head_mwhead_nb5oseries2oseries1oseries3
0.0250.9750.0250.9750.0250.9750.0250.9750.0250.975
1960-04-296.2551359.433007NaNNaNNaNNaNNaNNaNNaNNaN
1960-04-306.2696789.418478NaNNaNNaNNaNNaNNaNNaNNaN
1960-05-016.2690939.446798NaNNaNNaNNaNNaNNaNNaNNaN
1960-05-026.3004219.496691NaNNaNNaNNaNNaNNaNNaNNaN
1960-05-036.2381759.458558NaNNaNNaNNaNNaNNaNNaNNaN
.................................
2020-01-17NaNNaN7.9587859.637916NaNNaNNaNNaNNaNNaN
2020-01-18NaNNaN7.9458459.633597NaNNaNNaNNaNNaNNaN
2020-01-19NaNNaN7.9604079.672532NaNNaNNaNNaNNaNNaN
2020-01-20NaNNaN7.9562329.653112NaNNaNNaNNaNNaNNaN
2020-01-21NaNNaN7.9670729.639533NaNNaNNaNNaNNaNNaN
\n", + "

21817 rows × 10 columns

\n", + "" + ], + "text/plain": [ + " head_mw head_nb5 oseries2 oseries1 \\\n", + " 0.025 0.975 0.025 0.975 0.025 0.975 0.025 \n", + "1960-04-29 6.255135 9.433007 NaN NaN NaN NaN NaN \n", + "1960-04-30 6.269678 9.418478 NaN NaN NaN NaN NaN \n", + "1960-05-01 6.269093 9.446798 NaN NaN NaN NaN NaN \n", + "1960-05-02 6.300421 9.496691 NaN NaN NaN NaN NaN \n", + "1960-05-03 6.238175 9.458558 NaN NaN NaN NaN NaN \n", + "... ... ... ... ... ... ... ... \n", + "2020-01-17 NaN NaN 7.958785 9.637916 NaN NaN NaN \n", + "2020-01-18 NaN NaN 7.945845 9.633597 NaN NaN NaN \n", + "2020-01-19 NaN NaN 7.960407 9.672532 NaN NaN NaN \n", + "2020-01-20 NaN NaN 7.956232 9.653112 NaN NaN NaN \n", + "2020-01-21 NaN NaN 7.967072 9.639533 NaN NaN NaN \n", + "\n", + " oseries3 \n", + " 0.975 0.025 0.975 \n", + "1960-04-29 NaN NaN NaN \n", + "1960-04-30 NaN NaN NaN \n", + "1960-05-01 NaN NaN NaN \n", + "1960-05-02 NaN NaN NaN \n", + "1960-05-03 NaN NaN NaN \n", + "... ... ... ... \n", + "2020-01-17 NaN NaN NaN \n", + "2020-01-18 NaN NaN NaN \n", + "2020-01-19 NaN NaN NaN \n", + "2020-01-20 NaN NaN NaN \n", + "2020-01-21 NaN NaN NaN \n", + "\n", + "[21817 rows x 10 columns]" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "pstore.apply(\"models\", prediction_interval, kwargs={\"alpha\": 0.05})" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "ee105e5deb0b463eb6d6cf95b8d714fa", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "Applying prediction_interval (parallel): 0%| | 0/5 [00:00\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
head_mwhead_nb5oseries2oseries1oseries3
0.0250.9750.0250.9750.0250.9750.0250.9750.0250.975
1960-04-296.2406449.460150NaNNaNNaNNaNNaNNaNNaNNaN
1960-04-306.3493299.506166NaNNaNNaNNaNNaNNaNNaNNaN
1960-05-016.2472669.401046NaNNaNNaNNaNNaNNaNNaNNaN
1960-05-026.1752209.274749NaNNaNNaNNaNNaNNaNNaNNaN
1960-05-036.1276929.413533NaNNaNNaNNaNNaNNaNNaNNaN
.................................
2020-01-17NaNNaN7.9201019.642716NaNNaNNaNNaNNaNNaN
2020-01-18NaNNaN7.9094669.597625NaNNaNNaNNaNNaNNaN
2020-01-19NaNNaN7.9627329.637139NaNNaNNaNNaNNaNNaN
2020-01-20NaNNaN7.8701529.619891NaNNaNNaNNaNNaNNaN
2020-01-21NaNNaN7.9879679.652788NaNNaNNaNNaNNaNNaN
\n", + "

21817 rows × 10 columns

\n", + "" + ], + "text/plain": [ + " head_mw head_nb5 oseries2 oseries1 \\\n", + " 0.025 0.975 0.025 0.975 0.025 0.975 0.025 \n", + "1960-04-29 6.240644 9.460150 NaN NaN NaN NaN NaN \n", + "1960-04-30 6.349329 9.506166 NaN NaN NaN NaN NaN \n", + "1960-05-01 6.247266 9.401046 NaN NaN NaN NaN NaN \n", + "1960-05-02 6.175220 9.274749 NaN NaN NaN NaN NaN \n", + "1960-05-03 6.127692 9.413533 NaN NaN NaN NaN NaN \n", + "... ... ... ... ... ... ... ... \n", + "2020-01-17 NaN NaN 7.920101 9.642716 NaN NaN NaN \n", + "2020-01-18 NaN NaN 7.909466 9.597625 NaN NaN NaN \n", + "2020-01-19 NaN NaN 7.962732 9.637139 NaN NaN NaN \n", + "2020-01-20 NaN NaN 7.870152 9.619891 NaN NaN NaN \n", + "2020-01-21 NaN NaN 7.987967 9.652788 NaN NaN NaN \n", + "\n", + " oseries3 \n", + " 0.975 0.025 0.975 \n", + "1960-04-29 NaN NaN NaN \n", + "1960-04-30 NaN NaN NaN \n", + "1960-05-01 NaN NaN NaN \n", + "1960-05-02 NaN NaN NaN \n", + "1960-05-03 NaN NaN NaN \n", + "... ... ... ... \n", + "2020-01-17 NaN NaN NaN \n", + "2020-01-18 NaN NaN NaN \n", + "2020-01-19 NaN NaN NaN \n", + "2020-01-20 NaN NaN NaN \n", + "2020-01-21 NaN NaN NaN \n", + "\n", + "[21817 rows x 10 columns]" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "pstore.apply(\"models\", prediction_interval, kwargs={\"alpha\": 0.05}, parallel=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Load models\n", + "\n", + "Load models in parallel." + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "13e43dcc88b44f78b924f084ef301819", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "Applying get_models: 0%| | 0/5 [00:00 None: """Add item for both time series and pastas.Models (internal method). - Must be overriden by subclass. + Must be overridden by subclass. Parameters ---------- @@ -112,7 +106,7 @@ def _add_item( def _get_item(self, libname: str, name: str) -> Union[FrameorSeriesUnion, Dict]: """Get item (series or pastas.Models) (internal method). - Must be overriden by subclass. + Must be overridden by subclass. Parameters ---------- @@ -131,7 +125,7 @@ def _get_item(self, libname: str, name: str) -> Union[FrameorSeriesUnion, Dict]: def _del_item(self, libname: str, name: str) -> None: """Delete items (series or models) (internal method). - Must be overriden by subclass. + Must be overridden by subclass. Parameters ---------- @@ -145,7 +139,7 @@ def _del_item(self, libname: str, name: str) -> None: def _get_metadata(self, libname: str, name: str) -> Dict: """Get metadata (internal method). - Must be overriden by subclass. + Must be overridden by subclass. Parameters ---------- @@ -165,7 +159,7 @@ def _get_metadata(self, libname: str, name: str) -> Dict: def oseries_names(self): """List of oseries names. - Property must be overriden by subclass. + Property must be overridden by subclass. """ @property @@ -173,7 +167,7 @@ def oseries_names(self): def stresses_names(self): """List of stresses names. - Property must be overriden by subclass. + Property must be overridden by subclass. """ @property @@ -181,7 +175,37 @@ def stresses_names(self): def model_names(self): """List of model names. - Property must be overriden by subclass. + Property must be overridden by subclass. + """ + + @abstractmethod + def _parallel( + self, + func: Callable, + names: List[str], + progressbar: Optional[bool] = True, + max_workers: Optional[int] = None, + chunksize: Optional[int] = None, + desc: str = "", + ) -> None: + """Parallel processing of function. + + Must be overridden by subclass. + + Parameters + ---------- + func : function + function to apply in parallel + names : list + list of names to apply function to + progressbar : bool, optional + show progressbar, by default True + max_workers : int, optional + maximum number of workers, by default None + chunksize : int, optional + chunksize for parallel processing, by default None + desc : str, optional + description for progressbar, by default "" """ def set_check_model_series_values(self, b: bool): @@ -1111,8 +1135,8 @@ def _iter_series(self, libname: str, names: Optional[List[str]] = None): time series contained in library """ names = self._parse_names(names, libname) - for nam in names: - yield self._get_series(libname, nam, progressbar=False) + for name in names: + yield self._get_series(libname, name, progressbar=False) def iter_oseries(self, names: Optional[List[str]] = None): """Iterate over oseries in library. @@ -1338,626 +1362,6 @@ def oseries_models(self): return d -class ConnectorUtil: - """Mix-in class for general Connector helper functions. - - Only for internal methods, and not methods that are related to CRUD operations on - database. - """ - - def _parse_names( - self, - names: Optional[Union[list, str]] = None, - libname: Optional[str] = "oseries", - ) -> list: - """Parse names kwarg, returns iterable with name(s) (internal method). - - Parameters - ---------- - names : Union[list, str], optional - str or list of str or None or 'all' (last two options - retrieves all names) - libname : str, optional - name of library, default is 'oseries' - - Returns - ------- - list - list of names - """ - if not isinstance(names, str) and isinstance(names, Iterable): - return names - elif isinstance(names, str) and names != "all": - return [names] - elif names is None or names == "all": - if libname == "oseries": - return self.oseries_names - elif libname == "stresses": - return self.stresses_names - elif libname == "models": - return self.model_names - elif libname == "oseries_models": - return self.oseries_with_models - else: - raise ValueError(f"No library '{libname}'!") - else: - raise NotImplementedError(f"Cannot parse 'names': {names}") - - @staticmethod - def _meta_list_to_frame(metalist: list, names: list): - """Convert list of metadata dictionaries to DataFrame. - - Parameters - ---------- - metalist : list - list of metadata dictionaries - names : list - list of names corresponding to data in metalist - - Returns - ------- - pandas.DataFrame - DataFrame containing overview of metadata - """ - # convert to dataframe - if len(metalist) > 1: - meta = pd.DataFrame(metalist) - if len({"x", "y"}.difference(meta.columns)) == 0: - meta["x"] = meta["x"].astype(float) - meta["y"] = meta["y"].astype(float) - elif len(metalist) == 1: - meta = pd.DataFrame(metalist) - elif len(metalist) == 0: - meta = pd.DataFrame() - - meta.index = names - meta.index.name = "name" - return meta - - def _parse_model_dict(self, mdict: dict, update_ts_settings: bool = False): - """Parse dictionary describing pastas models (internal method). - - Parameters - ---------- - mdict : dict - dictionary describing pastas.Model - update_ts_settings : bool, optional - update stored tmin and tmax in time series settings - based on time series loaded from store. - - Returns - ------- - ml : pastas.Model - time series analysis model - """ - PASFILE_LEQ_022 = parse_version( - mdict["file_info"]["pastas_version"] - ) <= parse_version("0.22.0") - - # oseries - if "series" not in mdict["oseries"]: - name = str(mdict["oseries"]["name"]) - if name not in self.oseries.index: - msg = "oseries '{}' not present in library".format(name) - raise LookupError(msg) - mdict["oseries"]["series"] = self.get_oseries(name).squeeze() - # update tmin/tmax from time series - if update_ts_settings: - mdict["oseries"]["settings"]["tmin"] = mdict["oseries"]["series"].index[ - 0 - ] - mdict["oseries"]["settings"]["tmax"] = mdict["oseries"]["series"].index[ - -1 - ] - - # StressModel, WellModel - for ts in mdict["stressmodels"].values(): - if "stress" in ts.keys(): - # WellModel - classkey = "stressmodel" if PASFILE_LEQ_022 else "class" - if ts[classkey] == "WellModel": - for stress in ts["stress"]: - if "series" not in stress: - name = str(stress["name"]) - if name in self.stresses.index: - stress["series"] = self.get_stresses(name).squeeze() - # update tmin/tmax from time series - if update_ts_settings: - stress["settings"]["tmin"] = stress["series"].index[ - 0 - ] - stress["settings"]["tmax"] = stress["series"].index[ - -1 - ] - # StressModel - else: - for stress in ts["stress"] if PASFILE_LEQ_022 else [ts["stress"]]: - if "series" not in stress: - name = str(stress["name"]) - if name in self.stresses.index: - stress["series"] = self.get_stresses(name).squeeze() - # update tmin/tmax from time series - if update_ts_settings: - stress["settings"]["tmin"] = stress["series"].index[ - 0 - ] - stress["settings"]["tmax"] = stress["series"].index[ - -1 - ] - - # RechargeModel, TarsoModel - if ("prec" in ts.keys()) and ("evap" in ts.keys()): - for stress in [ts["prec"], ts["evap"]]: - if "series" not in stress: - name = str(stress["name"]) - if name in self.stresses.index: - stress["series"] = self.get_stresses(name).squeeze() - # update tmin/tmax from time series - if update_ts_settings: - stress["settings"]["tmin"] = stress["series"].index[0] - stress["settings"]["tmax"] = stress["series"].index[-1] - else: - msg = "stress '{}' not present in library".format(name) - raise KeyError(msg) - - # hack for pcov w dtype object (when filled with NaNs on store?) - if "fit" in mdict: - if "pcov" in mdict["fit"]: - pcov = mdict["fit"]["pcov"] - if pcov.dtypes.apply(lambda dtyp: isinstance(dtyp, object)).any(): - mdict["fit"]["pcov"] = pcov.astype(float) - - # check pastas version vs pas-file version - file_version = mdict["file_info"]["pastas_version"] - - # check file version and pastas version - # if file<0.23 and pastas>=1.0 --> error - PASTAS_GT_023 = parse_version(ps.__version__) > parse_version("0.23.1") - if PASFILE_LEQ_022 and PASTAS_GT_023: - raise UserWarning( - f"This file was created with Pastas v{file_version} " - f"and cannot be loaded with Pastas v{ps.__version__} Please load and " - "save the file with Pastas 0.23 first to update the file " - "format." - ) - - try: - # pastas>=0.15.0 - ml = ps.io.base._load_model(mdict) - except AttributeError: - # pastas<0.15.0 - ml = ps.io.base.load_model(mdict) - return ml - - @staticmethod - def _validate_input_series(series): - """Check if series is pandas.DataFrame or pandas.Series. - - Parameters - ---------- - series : object - object to validate - - Raises - ------ - TypeError - if object is not of type pandas.DataFrame or pandas.Series - """ - if not (isinstance(series, pd.DataFrame) or isinstance(series, pd.Series)): - raise TypeError("Please provide pandas.DataFrame or pandas.Series!") - if isinstance(series, pd.DataFrame): - if series.columns.size > 1: - raise ValueError("Only DataFrames with one column are supported!") - - @staticmethod - def _set_series_name(series, name): - """Set series name to match user defined name in store. - - Parameters - ---------- - series : pandas.Series or pandas.DataFrame - set name for this time series - name : str - name of the time series (used in the pastastore) - """ - if isinstance(series, pd.Series): - series.name = name - # empty string on index name causes trouble when reading - # data from ArcticDB: TODO: check if still an issue? - if series.index.name == "": - series.index.name = None - - if isinstance(series, pd.DataFrame): - series.columns = [name] - # check for hydropandas objects which are instances of DataFrame but - # do have a name attribute - if hasattr(series, "name"): - series.name = name - return series - - @staticmethod - def _check_stressmodels_supported(ml): - supported_stressmodels = [ - "StressModel", - "StressModel2", - "RechargeModel", - "WellModel", - "TarsoModel", - "Constant", - "LinearTrend", - "StepModel", - ] - if isinstance(ml, ps.Model): - smtyps = [sm._name for sm in ml.stressmodels.values()] - elif isinstance(ml, dict): - classkey = "stressmodel" if PASTAS_LEQ_022 else "class" - smtyps = [sm[classkey] for sm in ml["stressmodels"].values()] - check = isin(smtyps, supported_stressmodels) - if not all(check): - unsupported = set(smtyps) - set(supported_stressmodels) - raise NotImplementedError( - "PastaStore does not support storing models with the " - f"following stressmodels: {unsupported}" - ) - - @staticmethod - def _check_model_series_names_for_store(ml): - prec_evap_model = ["RechargeModel", "TarsoModel"] - - if isinstance(ml, ps.Model): - series_names = [ - istress.series.name - for sm in ml.stressmodels.values() - for istress in sm.stress - ] - - elif isinstance(ml, dict): - # non RechargeModel, Tarsomodel, WellModel stressmodels - classkey = "stressmodel" if PASTAS_LEQ_022 else "class" - if PASTAS_LEQ_022: - series_names = [ - istress["name"] - for sm in ml["stressmodels"].values() - if sm[classkey] not in (prec_evap_model + ["WellModel"]) - for istress in sm["stress"] - ] - else: - series_names = [ - sm["stress"]["name"] - for sm in ml["stressmodels"].values() - if sm[classkey] not in (prec_evap_model + ["WellModel"]) - ] - - # WellModel - if isin( - ["WellModel"], - [i[classkey] for i in ml["stressmodels"].values()], - ).any(): - series_names += [ - istress["name"] - for sm in ml["stressmodels"].values() - if sm[classkey] in ["WellModel"] - for istress in sm["stress"] - ] - - # RechargeModel, TarsoModel - if isin( - prec_evap_model, - [i[classkey] for i in ml["stressmodels"].values()], - ).any(): - series_names += [ - istress["name"] - for sm in ml["stressmodels"].values() - if sm[classkey] in prec_evap_model - for istress in [sm["prec"], sm["evap"]] - ] - - else: - raise TypeError("Expected pastas.Model or dict!") - if len(series_names) - len(set(series_names)) > 0: - msg = ( - "There are multiple stresses series with the same name! " - "Each series name must be unique for the PastaStore!" - ) - raise ValueError(msg) - - def _check_oseries_in_store(self, ml: Union[ps.Model, dict]): - """Check if Model oseries are contained in PastaStore (internal method). - - Parameters - ---------- - ml : Union[ps.Model, dict] - pastas Model - """ - if isinstance(ml, ps.Model): - name = ml.oseries.name - elif isinstance(ml, dict): - name = str(ml["oseries"]["name"]) - else: - raise TypeError("Expected pastas.Model or dict!") - if name not in self.oseries.index: - msg = ( - f"Cannot add model because oseries '{name}' " - "is not contained in store." - ) - raise LookupError(msg) - # expensive check - if self.CHECK_MODEL_SERIES_VALUES and isinstance(ml, ps.Model): - s_org = self.get_oseries(name).squeeze().dropna() - if PASTAS_LEQ_022: - so = ml.oseries.series_original - else: - so = ml.oseries._series_original - try: - assert_series_equal( - so.dropna(), - s_org, - atol=self.SERIES_EQUALITY_ABSOLUTE_TOLERANCE, - rtol=self.SERIES_EQUALITY_RELATIVE_TOLERANCE, - ) - except AssertionError as e: - raise ValueError( - f"Cannot add model because model oseries '{name}'" - " is different from stored oseries! See stacktrace for differences." - ) from e - - def _check_stresses_in_store(self, ml: Union[ps.Model, dict]): - """Check if stresses time series are contained in PastaStore (internal method). - - Parameters - ---------- - ml : Union[ps.Model, dict] - pastas Model - """ - prec_evap_model = ["RechargeModel", "TarsoModel"] - if isinstance(ml, ps.Model): - for sm in ml.stressmodels.values(): - if sm._name in prec_evap_model: - stresses = [sm.prec, sm.evap] - else: - stresses = sm.stress - for s in stresses: - if str(s.name) not in self.stresses.index: - msg = ( - f"Cannot add model because stress '{s.name}' " - "is not contained in store." - ) - raise LookupError(msg) - if self.CHECK_MODEL_SERIES_VALUES: - s_org = self.get_stresses(s.name).squeeze() - if PASTAS_LEQ_022: - so = s.series_original - else: - so = s._series_original - try: - assert_series_equal( - so, - s_org, - atol=self.SERIES_EQUALITY_ABSOLUTE_TOLERANCE, - rtol=self.SERIES_EQUALITY_RELATIVE_TOLERANCE, - ) - except AssertionError as e: - raise ValueError( - f"Cannot add model because model stress " - f"'{s.name}' is different from stored stress! " - "See stacktrace for differences." - ) from e - elif isinstance(ml, dict): - for sm in ml["stressmodels"].values(): - classkey = "stressmodel" if PASTAS_LEQ_022 else "class" - if sm[classkey] in prec_evap_model: - stresses = [sm["prec"], sm["evap"]] - elif sm[classkey] in ["WellModel"]: - stresses = sm["stress"] - else: - stresses = sm["stress"] if PASTAS_LEQ_022 else [sm["stress"]] - for s in stresses: - if str(s["name"]) not in self.stresses.index: - msg = ( - f"Cannot add model because stress '{s['name']}' " - "is not contained in store." - ) - raise LookupError(msg) - else: - raise TypeError("Expected pastas.Model or dict!") - - def _stored_series_to_json( - self, - libname: str, - names: Optional[Union[list, str]] = None, - squeeze: bool = True, - progressbar: bool = False, - ): - """Write stored series to JSON. - - Parameters - ---------- - libname : str - library name - names : Optional[Union[list, str]], optional - names of series, by default None - squeeze : bool, optional - return single entry as json string instead - of list, by default True - progressbar : bool, optional - show progressbar, by default False - - Returns - ------- - files : list or str - list of series converted to JSON string or single string - if single entry is returned and squeeze is True - """ - names = self._parse_names(names, libname=libname) - files = [] - for n in tqdm(names, desc=libname) if progressbar else names: - s = self._get_series(libname, n, progressbar=False) - if isinstance(s, pd.Series): - s = s.to_frame() - try: - sjson = s.to_json(orient="columns") - except ValueError as e: - msg = ( - f"DatetimeIndex of '{n}' probably contains NaT " - "or duplicate timestamps!" - ) - raise ValueError(msg) from e - files.append(sjson) - if len(files) == 1 and squeeze: - return files[0] - else: - return files - - def _stored_metadata_to_json( - self, - libname: str, - names: Optional[Union[list, str]] = None, - squeeze: bool = True, - progressbar: bool = False, - ): - """Write metadata from stored series to JSON. - - Parameters - ---------- - libname : str - library containing series - names : Optional[Union[list, str]], optional - names to parse, by default None - squeeze : bool, optional - return single entry as json string instead of list, by default True - progressbar : bool, optional - show progressbar, by default False - - Returns - ------- - files : list or str - list of json string - """ - names = self._parse_names(names, libname=libname) - files = [] - for n in tqdm(names, desc=libname) if progressbar else names: - meta = self.get_metadata(libname, n, as_frame=False) - meta_json = json.dumps(meta, cls=PastasEncoder, indent=4) - files.append(meta_json) - if len(files) == 1 and squeeze: - return files[0] - else: - return files - - def _series_to_archive( - self, - archive, - libname: str, - names: Optional[Union[list, str]] = None, - progressbar: bool = True, - ): - """Write DataFrame or Series to zipfile (internal method). - - Parameters - ---------- - archive : zipfile.ZipFile - reference to an archive to write data to - libname : str - name of the library to write to zipfile - names : str or list of str, optional - names of the time series to write to archive, by default None, - which writes all time series to archive - progressbar : bool, optional - show progressbar, by default True - """ - names = self._parse_names(names, libname=libname) - for n in tqdm(names, desc=libname) if progressbar else names: - sjson = self._stored_series_to_json( - libname, names=n, progressbar=False, squeeze=True - ) - meta_json = self._stored_metadata_to_json( - libname, names=n, progressbar=False, squeeze=True - ) - archive.writestr(f"{libname}/{n}.json", sjson) - archive.writestr(f"{libname}/{n}_meta.json", meta_json) - - def _models_to_archive(self, archive, names=None, progressbar=True): - """Write pastas.Model to zipfile (internal method). - - Parameters - ---------- - archive : zipfile.ZipFile - reference to an archive to write data to - names : str or list of str, optional - names of the models to write to archive, by default None, - which writes all models to archive - progressbar : bool, optional - show progressbar, by default True - """ - names = self._parse_names(names, libname="models") - for n in tqdm(names, desc="models") if progressbar else names: - m = self.get_models(n, return_dict=True) - jsondict = json.dumps(m, cls=PastasEncoder, indent=4) - archive.writestr(f"models/{n}.pas", jsondict) - - @staticmethod - def _series_from_json(fjson: str, squeeze: bool = True): - """Load time series from JSON. - - Parameters - ---------- - fjson : str - path to file - squeeze : bool, optional - squeeze time series object to obtain pandas Series - - Returns - ------- - s : pd.DataFrame - DataFrame containing time series - """ - s = pd.read_json(fjson, orient="columns", precise_float=True, dtype=False) - if not isinstance(s.index, pd.DatetimeIndex): - s.index = pd.to_datetime(s.index, unit="ms") - s = s.sort_index() # needed for some reason ... - if squeeze: - return s.squeeze() - return s - - @staticmethod - def _metadata_from_json(fjson: str): - """Load metadata dictionary from JSON. - - Parameters - ---------- - fjson : str - path to file - - Returns - ------- - meta : dict - dictionary containing metadata - """ - with open(fjson, "r") as f: - meta = json.load(f) - return meta - - def _get_model_orphans(self): - """Get models whose oseries no longer exist in database. - - Returns - ------- - dict - dictionary with oseries names as keys and lists of model names - as values - """ - d = {} - for mlnam in tqdm(self.model_names, desc="Identifying model orphans"): - mdict = self.get_models(mlnam, return_dict=True) - onam = mdict["oseries"]["name"] - if onam not in self.oseries_names: - if onam in d: - d[onam] = d[onam].append(mlnam) - else: - d[onam] = [mlnam] - return d - - class ModelAccessor: """Object for managing access to stored models. diff --git a/pastastore/connectors.py b/pastastore/connectors.py index 99ca25d8..61db3e14 100644 --- a/pastastore/connectors.py +++ b/pastastore/connectors.py @@ -1,27 +1,756 @@ """Module containing classes for connecting to different data stores.""" import json +import logging import os import warnings +from collections.abc import Iterable +from concurrent.futures import ProcessPoolExecutor from copy import deepcopy -from typing import Dict, Optional, Union +from functools import partial + +# import weakref +from typing import Callable, Dict, List, Optional, Tuple, Union import pandas as pd +import pastas as ps +from numpy import isin +from packaging.version import parse as parse_version +from pandas.testing import assert_series_equal from pastas.io.pas import PastasEncoder, pastas_hook +from tqdm.auto import tqdm +from tqdm.contrib.concurrent import process_map -from pastastore.base import BaseConnector, ConnectorUtil, ModelAccessor +from pastastore.base import BaseConnector, ModelAccessor from pastastore.util import _custom_warning +from pastastore.version import PASTAS_LEQ_022 FrameorSeriesUnion = Union[pd.DataFrame, pd.Series] warnings.showwarning = _custom_warning +logger = logging.getLogger(__name__) + + +class ConnectorUtil: + """Mix-in class for general Connector helper functions. + + Only for internal methods, and not methods that are related to CRUD operations on + database. + """ + + def _parse_names( + self, + names: Optional[Union[list, str]] = None, + libname: Optional[str] = "oseries", + ) -> list: + """Parse names kwarg, returns iterable with name(s) (internal method). + + Parameters + ---------- + names : Union[list, str], optional + str or list of str or None or 'all' (last two options + retrieves all names) + libname : str, optional + name of library, default is 'oseries' + + Returns + ------- + list + list of names + """ + if not isinstance(names, str) and isinstance(names, Iterable): + return names + elif isinstance(names, str) and names != "all": + return [names] + elif names is None or names == "all": + if libname == "oseries": + return self.oseries_names + elif libname == "stresses": + return self.stresses_names + elif libname == "models": + return self.model_names + elif libname == "oseries_models": + return self.oseries_with_models + else: + raise ValueError(f"No library '{libname}'!") + else: + raise NotImplementedError(f"Cannot parse 'names': {names}") + + @staticmethod + def _meta_list_to_frame(metalist: list, names: list): + """Convert list of metadata dictionaries to DataFrame. + + Parameters + ---------- + metalist : list + list of metadata dictionaries + names : list + list of names corresponding to data in metalist + + Returns + ------- + pandas.DataFrame + DataFrame containing overview of metadata + """ + # convert to dataframe + if len(metalist) > 1: + meta = pd.DataFrame(metalist) + if len({"x", "y"}.difference(meta.columns)) == 0: + meta["x"] = meta["x"].astype(float) + meta["y"] = meta["y"].astype(float) + elif len(metalist) == 1: + meta = pd.DataFrame(metalist) + elif len(metalist) == 0: + meta = pd.DataFrame() + + meta.index = names + meta.index.name = "name" + return meta + + def _parse_model_dict(self, mdict: dict, update_ts_settings: bool = False): + """Parse dictionary describing pastas models (internal method). + + Parameters + ---------- + mdict : dict + dictionary describing pastas.Model + update_ts_settings : bool, optional + update stored tmin and tmax in time series settings + based on time series loaded from store. + + Returns + ------- + ml : pastas.Model + time series analysis model + """ + PASFILE_LEQ_022 = parse_version( + mdict["file_info"]["pastas_version"] + ) <= parse_version("0.22.0") + + # oseries + if "series" not in mdict["oseries"]: + name = str(mdict["oseries"]["name"]) + if name not in self.oseries.index: + msg = "oseries '{}' not present in library".format(name) + raise LookupError(msg) + mdict["oseries"]["series"] = self.get_oseries(name).squeeze() + # update tmin/tmax from time series + if update_ts_settings: + mdict["oseries"]["settings"]["tmin"] = mdict["oseries"]["series"].index[ + 0 + ] + mdict["oseries"]["settings"]["tmax"] = mdict["oseries"]["series"].index[ + -1 + ] + + # StressModel, WellModel + for ts in mdict["stressmodels"].values(): + if "stress" in ts.keys(): + # WellModel + classkey = "stressmodel" if PASFILE_LEQ_022 else "class" + if ts[classkey] == "WellModel": + for stress in ts["stress"]: + if "series" not in stress: + name = str(stress["name"]) + if name in self.stresses.index: + stress["series"] = self.get_stresses(name).squeeze() + # update tmin/tmax from time series + if update_ts_settings: + stress["settings"]["tmin"] = stress["series"].index[ + 0 + ] + stress["settings"]["tmax"] = stress["series"].index[ + -1 + ] + # StressModel + else: + for stress in ts["stress"] if PASFILE_LEQ_022 else [ts["stress"]]: + if "series" not in stress: + name = str(stress["name"]) + if name in self.stresses.index: + stress["series"] = self.get_stresses(name).squeeze() + # update tmin/tmax from time series + if update_ts_settings: + stress["settings"]["tmin"] = stress["series"].index[ + 0 + ] + stress["settings"]["tmax"] = stress["series"].index[ + -1 + ] + + # RechargeModel, TarsoModel + if ("prec" in ts.keys()) and ("evap" in ts.keys()): + for stress in [ts["prec"], ts["evap"]]: + if "series" not in stress: + name = str(stress["name"]) + if name in self.stresses.index: + stress["series"] = self.get_stresses(name).squeeze() + # update tmin/tmax from time series + if update_ts_settings: + stress["settings"]["tmin"] = stress["series"].index[0] + stress["settings"]["tmax"] = stress["series"].index[-1] + else: + msg = "stress '{}' not present in library".format(name) + raise KeyError(msg) + + # hack for pcov w dtype object (when filled with NaNs on store?) + if "fit" in mdict: + if "pcov" in mdict["fit"]: + pcov = mdict["fit"]["pcov"] + if pcov.dtypes.apply(lambda dtyp: isinstance(dtyp, object)).any(): + mdict["fit"]["pcov"] = pcov.astype(float) + + # check pastas version vs pas-file version + file_version = mdict["file_info"]["pastas_version"] + + # check file version and pastas version + # if file<0.23 and pastas>=1.0 --> error + PASTAS_GT_023 = parse_version(ps.__version__) > parse_version("0.23.1") + if PASFILE_LEQ_022 and PASTAS_GT_023: + raise UserWarning( + f"This file was created with Pastas v{file_version} " + f"and cannot be loaded with Pastas v{ps.__version__} Please load and " + "save the file with Pastas 0.23 first to update the file " + "format." + ) + + try: + # pastas>=0.15.0 + ml = ps.io.base._load_model(mdict) + except AttributeError: + # pastas<0.15.0 + ml = ps.io.base.load_model(mdict) + return ml + + @staticmethod + def _validate_input_series(series): + """Check if series is pandas.DataFrame or pandas.Series. + + Parameters + ---------- + series : object + object to validate + + Raises + ------ + TypeError + if object is not of type pandas.DataFrame or pandas.Series + """ + if not (isinstance(series, pd.DataFrame) or isinstance(series, pd.Series)): + raise TypeError("Please provide pandas.DataFrame or pandas.Series!") + if isinstance(series, pd.DataFrame): + if series.columns.size > 1: + raise ValueError("Only DataFrames with one column are supported!") + + @staticmethod + def _set_series_name(series, name): + """Set series name to match user defined name in store. + + Parameters + ---------- + series : pandas.Series or pandas.DataFrame + set name for this time series + name : str + name of the time series (used in the pastastore) + """ + if isinstance(series, pd.Series): + series.name = name + # empty string on index name causes trouble when reading + # data from ArcticDB: TODO: check if still an issue? + if series.index.name == "": + series.index.name = None + + if isinstance(series, pd.DataFrame): + series.columns = [name] + # check for hydropandas objects which are instances of DataFrame but + # do have a name attribute + if hasattr(series, "name"): + series.name = name + return series + + @staticmethod + def _check_stressmodels_supported(ml): + supported_stressmodels = [ + "StressModel", + "StressModel2", + "RechargeModel", + "WellModel", + "TarsoModel", + "Constant", + "LinearTrend", + "StepModel", + ] + if isinstance(ml, ps.Model): + smtyps = [sm._name for sm in ml.stressmodels.values()] + elif isinstance(ml, dict): + classkey = "stressmodel" if PASTAS_LEQ_022 else "class" + smtyps = [sm[classkey] for sm in ml["stressmodels"].values()] + check = isin(smtyps, supported_stressmodels) + if not all(check): + unsupported = set(smtyps) - set(supported_stressmodels) + raise NotImplementedError( + "PastaStore does not support storing models with the " + f"following stressmodels: {unsupported}" + ) + + @staticmethod + def _check_model_series_names_for_store(ml): + prec_evap_model = ["RechargeModel", "TarsoModel"] + + if isinstance(ml, ps.Model): + series_names = [ + istress.series.name + for sm in ml.stressmodels.values() + for istress in sm.stress + ] + + elif isinstance(ml, dict): + # non RechargeModel, Tarsomodel, WellModel stressmodels + classkey = "stressmodel" if PASTAS_LEQ_022 else "class" + if PASTAS_LEQ_022: + series_names = [ + istress["name"] + for sm in ml["stressmodels"].values() + if sm[classkey] not in (prec_evap_model + ["WellModel"]) + for istress in sm["stress"] + ] + else: + series_names = [ + sm["stress"]["name"] + for sm in ml["stressmodels"].values() + if sm[classkey] not in (prec_evap_model + ["WellModel"]) + ] + + # WellModel + if isin( + ["WellModel"], + [i[classkey] for i in ml["stressmodels"].values()], + ).any(): + series_names += [ + istress["name"] + for sm in ml["stressmodels"].values() + if sm[classkey] in ["WellModel"] + for istress in sm["stress"] + ] + + # RechargeModel, TarsoModel + if isin( + prec_evap_model, + [i[classkey] for i in ml["stressmodels"].values()], + ).any(): + series_names += [ + istress["name"] + for sm in ml["stressmodels"].values() + if sm[classkey] in prec_evap_model + for istress in [sm["prec"], sm["evap"]] + ] + + else: + raise TypeError("Expected pastas.Model or dict!") + if len(series_names) - len(set(series_names)) > 0: + msg = ( + "There are multiple stresses series with the same name! " + "Each series name must be unique for the PastaStore!" + ) + raise ValueError(msg) + + def _check_oseries_in_store(self, ml: Union[ps.Model, dict]): + """Check if Model oseries are contained in PastaStore (internal method). + + Parameters + ---------- + ml : Union[ps.Model, dict] + pastas Model + """ + if isinstance(ml, ps.Model): + name = ml.oseries.name + elif isinstance(ml, dict): + name = str(ml["oseries"]["name"]) + else: + raise TypeError("Expected pastas.Model or dict!") + if name not in self.oseries.index: + msg = ( + f"Cannot add model because oseries '{name}' " + "is not contained in store." + ) + raise LookupError(msg) + # expensive check + if self.CHECK_MODEL_SERIES_VALUES and isinstance(ml, ps.Model): + s_org = self.get_oseries(name).squeeze().dropna() + if PASTAS_LEQ_022: + so = ml.oseries.series_original + else: + so = ml.oseries._series_original + try: + assert_series_equal( + so.dropna(), + s_org, + atol=self.SERIES_EQUALITY_ABSOLUTE_TOLERANCE, + rtol=self.SERIES_EQUALITY_RELATIVE_TOLERANCE, + ) + except AssertionError as e: + raise ValueError( + f"Cannot add model because model oseries '{name}'" + " is different from stored oseries! See stacktrace for differences." + ) from e + + def _check_stresses_in_store(self, ml: Union[ps.Model, dict]): + """Check if stresses time series are contained in PastaStore (internal method). + + Parameters + ---------- + ml : Union[ps.Model, dict] + pastas Model + """ + prec_evap_model = ["RechargeModel", "TarsoModel"] + if isinstance(ml, ps.Model): + for sm in ml.stressmodels.values(): + if sm._name in prec_evap_model: + stresses = [sm.prec, sm.evap] + else: + stresses = sm.stress + for s in stresses: + if str(s.name) not in self.stresses.index: + msg = ( + f"Cannot add model because stress '{s.name}' " + "is not contained in store." + ) + raise LookupError(msg) + if self.CHECK_MODEL_SERIES_VALUES: + s_org = self.get_stresses(s.name).squeeze() + if PASTAS_LEQ_022: + so = s.series_original + else: + so = s._series_original + try: + assert_series_equal( + so, + s_org, + atol=self.SERIES_EQUALITY_ABSOLUTE_TOLERANCE, + rtol=self.SERIES_EQUALITY_RELATIVE_TOLERANCE, + ) + except AssertionError as e: + raise ValueError( + f"Cannot add model because model stress " + f"'{s.name}' is different from stored stress! " + "See stacktrace for differences." + ) from e + elif isinstance(ml, dict): + for sm in ml["stressmodels"].values(): + classkey = "stressmodel" if PASTAS_LEQ_022 else "class" + if sm[classkey] in prec_evap_model: + stresses = [sm["prec"], sm["evap"]] + elif sm[classkey] in ["WellModel"]: + stresses = sm["stress"] + else: + stresses = sm["stress"] if PASTAS_LEQ_022 else [sm["stress"]] + for s in stresses: + if str(s["name"]) not in self.stresses.index: + msg = ( + f"Cannot add model because stress '{s['name']}' " + "is not contained in store." + ) + raise LookupError(msg) + else: + raise TypeError("Expected pastas.Model or dict!") + + def _stored_series_to_json( + self, + libname: str, + names: Optional[Union[list, str]] = None, + squeeze: bool = True, + progressbar: bool = False, + ): + """Write stored series to JSON. + + Parameters + ---------- + libname : str + library name + names : Optional[Union[list, str]], optional + names of series, by default None + squeeze : bool, optional + return single entry as json string instead + of list, by default True + progressbar : bool, optional + show progressbar, by default False + + Returns + ------- + files : list or str + list of series converted to JSON string or single string + if single entry is returned and squeeze is True + """ + names = self._parse_names(names, libname=libname) + files = [] + for n in tqdm(names, desc=libname) if progressbar else names: + s = self._get_series(libname, n, progressbar=False) + if isinstance(s, pd.Series): + s = s.to_frame() + try: + sjson = s.to_json(orient="columns") + except ValueError as e: + msg = ( + f"DatetimeIndex of '{n}' probably contains NaT " + "or duplicate timestamps!" + ) + raise ValueError(msg) from e + files.append(sjson) + if len(files) == 1 and squeeze: + return files[0] + else: + return files + + def _stored_metadata_to_json( + self, + libname: str, + names: Optional[Union[list, str]] = None, + squeeze: bool = True, + progressbar: bool = False, + ): + """Write metadata from stored series to JSON. + + Parameters + ---------- + libname : str + library containing series + names : Optional[Union[list, str]], optional + names to parse, by default None + squeeze : bool, optional + return single entry as json string instead of list, by default True + progressbar : bool, optional + show progressbar, by default False + + Returns + ------- + files : list or str + list of json string + """ + names = self._parse_names(names, libname=libname) + files = [] + for n in tqdm(names, desc=libname) if progressbar else names: + meta = self.get_metadata(libname, n, as_frame=False) + meta_json = json.dumps(meta, cls=PastasEncoder, indent=4) + files.append(meta_json) + if len(files) == 1 and squeeze: + return files[0] + else: + return files + + def _series_to_archive( + self, + archive, + libname: str, + names: Optional[Union[list, str]] = None, + progressbar: bool = True, + ): + """Write DataFrame or Series to zipfile (internal method). + + Parameters + ---------- + archive : zipfile.ZipFile + reference to an archive to write data to + libname : str + name of the library to write to zipfile + names : str or list of str, optional + names of the time series to write to archive, by default None, + which writes all time series to archive + progressbar : bool, optional + show progressbar, by default True + """ + names = self._parse_names(names, libname=libname) + for n in tqdm(names, desc=libname) if progressbar else names: + sjson = self._stored_series_to_json( + libname, names=n, progressbar=False, squeeze=True + ) + meta_json = self._stored_metadata_to_json( + libname, names=n, progressbar=False, squeeze=True + ) + archive.writestr(f"{libname}/{n}.pas", sjson) + archive.writestr(f"{libname}/{n}_meta.pas", meta_json) + + def _models_to_archive(self, archive, names=None, progressbar=True): + """Write pastas.Model to zipfile (internal method). + + Parameters + ---------- + archive : zipfile.ZipFile + reference to an archive to write data to + names : str or list of str, optional + names of the models to write to archive, by default None, + which writes all models to archive + progressbar : bool, optional + show progressbar, by default True + """ + names = self._parse_names(names, libname="models") + for n in tqdm(names, desc="models") if progressbar else names: + m = self.get_models(n, return_dict=True) + jsondict = json.dumps(m, cls=PastasEncoder, indent=4) + archive.writestr(f"models/{n}.pas", jsondict) + + @staticmethod + def _series_from_json(fjson: str, squeeze: bool = True): + """Load time series from JSON. + + Parameters + ---------- + fjson : str + path to file + squeeze : bool, optional + squeeze time series object to obtain pandas Series + + Returns + ------- + s : pd.DataFrame + DataFrame containing time series + """ + s = pd.read_json(fjson, orient="columns", precise_float=True, dtype=False) + if not isinstance(s.index, pd.DatetimeIndex): + s.index = pd.to_datetime(s.index, unit="ms") + s = s.sort_index() # needed for some reason ... + if squeeze: + return s.squeeze() + return s + + @staticmethod + def _metadata_from_json(fjson: str): + """Load metadata dictionary from JSON. + + Parameters + ---------- + fjson : str + path to file + + Returns + ------- + meta : dict + dictionary containing metadata + """ + with open(fjson, "r") as f: + meta = json.load(f) + return meta + + def _get_model_orphans(self): + """Get models whose oseries no longer exist in database. + + Returns + ------- + dict + dictionary with oseries names as keys and lists of model names + as values + """ + d = {} + for mlnam in tqdm(self.model_names, desc="Identifying model orphans"): + mdict = self.get_models(mlnam, return_dict=True) + onam = mdict["oseries"]["name"] + if onam not in self.oseries_names: + if onam in d: + d[onam] = d[onam].append(mlnam) + else: + d[onam] = [mlnam] + return d + + @staticmethod + def _solve_model( + ml_name: str, + connector: Optional[BaseConnector] = None, + report: bool = False, + ignore_solve_errors: bool = False, + **kwargs, + ) -> None: + """Solve a model in the store (internal method). + + ml_name : list of str, optional + name of a model in the pastastore + connector : PasConnector, optional + Connector to use, by default None which gets the global ArcticDB + Connector. Otherwise parse a PasConnector. + report : boolean, optional + determines if a report is printed when the model is solved, + default is False + ignore_solve_errors : boolean, optional + if True, errors emerging from the solve method are ignored, + default is False which will raise an exception when a model + cannot be optimized + **kwargs : dictionary + arguments are passed to the solve method. + """ + if connector is not None: + conn = connector + else: + conn = globals()["conn"] + + ml = conn.get_models(ml_name) + m_kwargs = {} + for key, value in kwargs.items(): + if isinstance(value, pd.Series): + m_kwargs[key] = value.loc[ml.name] + else: + m_kwargs[key] = value + # Convert timestamps + for tstamp in ["tmin", "tmax"]: + if tstamp in m_kwargs: + m_kwargs[tstamp] = pd.Timestamp(m_kwargs[tstamp]) + + try: + ml.solve(report=report, **m_kwargs) + except Exception as e: + if ignore_solve_errors: + warning = "Solve error ignored for '%s': %s " % (ml.name, e) + logger.warning(warning) + else: + raise e + + conn.add_model(ml, overwrite=True) + + @staticmethod + def _get_statistics( + name: str, + statistics: List[str], + connector: Union[None, BaseConnector] = None, + **kwargs, + ) -> pd.Series: + """Get statistics for a model in the store (internal method). + + This function was made to be run in parallel mode. For the odd user + that wants to run this function directly in sequential model using + an ArcticDBDConnector the connector argument must be passed in the kwargs + of the apply method. + """ + if connector is not None: + conn = connector + else: + conn = globals()["conn"] + + ml = conn.get_model(name) + series = pd.Series(index=statistics, dtype=float) + for stat in statistics: + series.loc[stat] = getattr(ml.stats, stat)(**kwargs) + return series + + @staticmethod + def _get_max_workers_and_chunksize( + max_workers: int, njobs: int, chunksize: int = None + ) -> Tuple[int, int]: + """Get the maximum workers and chunksize for parallel processing. + + From: https://stackoverflow.com/a/42096963/10596229 + """ + max_workers = ( + min(32, os.cpu_count() + 4) if max_workers is None else max_workers + ) + if chunksize is None: + num_chunks = max_workers * 14 + chunksize = max(njobs // num_chunks, 1) + return max_workers, chunksize + class ArcticDBConnector(BaseConnector, ConnectorUtil): """ArcticDBConnector object using ArcticDB to store data.""" conn_type = "arcticdb" - def __init__(self, name: str, uri: str): + def __init__(self, name: str, uri: str, verbose: bool = True): """Create an ArcticDBConnector object using ArcticDB to store data. Parameters @@ -30,6 +759,8 @@ def __init__(self, name: str, uri: str): name of the database uri : str URI connection string (e.g. 'lmdb://') + verbose : bool, optional + whether to print message when database is initialized, by default True """ try: import arcticdb @@ -41,23 +772,24 @@ def __init__(self, name: str, uri: str): self.libs: dict = {} self.arc = arcticdb.Arctic(uri) - self._initialize() + self._initialize(verbose=verbose) self.models = ModelAccessor(self) # for older versions of PastaStore, if oseries_models library is empty # populate oseries - models database self._update_all_oseries_model_links() - def _initialize(self) -> None: + def _initialize(self, verbose: bool = True) -> None: """Initialize the libraries (internal method).""" for libname in self._default_library_names: if self._library_name(libname) not in self.arc.list_libraries(): self.arc.create_library(self._library_name(libname)) else: - print( - f"ArcticDBConnector: library " - f"'{self._library_name(libname)}'" - " already exists. Linking to existing library." - ) + if verbose: + print( + f"ArcticDBConnector: library " + f"'{self._library_name(libname)}'" + " already exists. Linking to existing library." + ) self.libs[libname] = self._get_library(libname) def _library_name(self, libname: str) -> str: @@ -159,6 +891,70 @@ def _get_metadata(self, libname: str, name: str) -> dict: lib = self._get_library(libname) return lib.read_metadata(name).metadata + def _parallel( + self, + func: Callable, + names: List[str], + kwargs: Optional[Dict] = None, + progressbar: Optional[bool] = True, + max_workers: Optional[int] = None, + chunksize: Optional[int] = None, + desc: str = "", + ): + """Parallel processing of function. + + Does not return results, so function must store results in database. + + Parameters + ---------- + func : function + function to apply in parallel + names : list + list of names to apply function to + kwargs : dict, optional + keyword arguments to pass to function + progressbar : bool, optional + show progressbar, by default True + max_workers : int, optional + maximum number of workers, by default None + chunksize : int, optional + chunksize for parallel processing, by default None + desc : str, optional + description for progressbar, by default "" + """ + max_workers, chunksize = ConnectorUtil._get_max_workers_and_chunksize( + max_workers, len(names), chunksize + ) + + def initializer(*args): + global conn + conn = ArcticDBConnector(*args) + + initargs = (self.name, self.uri, False) + + if kwargs is None: + kwargs = {} + + if progressbar: + result = [] + with tqdm(total=len(names), desc=desc) as pbar: + with ProcessPoolExecutor( + max_workers=max_workers, initializer=initializer, initargs=initargs + ) as executor: + for item in executor.map( + partial(func, **kwargs), names, chunksize=chunksize + ): + result.append(item) + pbar.update() + else: + with ProcessPoolExecutor( + max_workers=max_workers, initializer=initializer, initargs=initargs + ) as executor: + result = executor.map( + partial(func, **kwargs), names, chunksize=chunksize + ) + return result + @property def oseries_names(self): """List of oseries names. @@ -317,6 +1113,12 @@ def _get_metadata(self, libname: str, name: str) -> dict: imeta = deepcopy(lib[name][0]) return imeta + def _parallel(self, *args, **kwargs) -> None: + raise NotImplementedError( + "DictConnector does not support parallel processing," + " use PasConnector or ArcticDBConnector." + ) + @property def oseries_names(self): """List of oseries names.""" @@ -347,7 +1149,7 @@ class PasConnector(BaseConnector, ConnectorUtil): conn_type = "pas" - def __init__(self, name: str, path: str): + def __init__(self, name: str, path: str, verbose: bool = True): """Create PasConnector object that stores data as JSON files on disk. Uses Pastas export format (pas-files) to store files. @@ -359,28 +1161,32 @@ def __init__(self, name: str, path: str): directory in which the data will be stored path : str path to directory for storing the data + verbose : bool, optional + whether to print message when database is initialized, by default True """ self.name = name self.path = os.path.abspath(os.path.join(path, self.name)) self.relpath = os.path.relpath(self.path) - self._initialize() + self._initialize(verbose=verbose) self.models = ModelAccessor(self) # for older versions of PastaStore, if oseries_models library is empty # populate oseries_models library self._update_all_oseries_model_links() - def _initialize(self) -> None: + def _initialize(self, verbose: bool = True) -> None: """Initialize the libraries (internal method).""" for val in self._default_library_names: libdir = os.path.join(self.path, val) if not os.path.exists(libdir): - print(f"PasConnector: library '{val}' created in '{libdir}'") + if verbose: + print(f"PasConnector: library '{val}' created in '{libdir}'") os.makedirs(libdir) else: - print( - f"PasConnector: library '{val}' already exists. " - f"Linking to existing directory: '{libdir}'" - ) + if verbose: + print( + f"PasConnector: library '{val}' already exists. " + f"Linking to existing directory: '{libdir}'" + ) setattr(self, f"lib_{val}", os.path.join(self.path, val)) def _get_library(self, libname: str): @@ -523,6 +1329,58 @@ def _get_metadata(self, libname: str, name: str) -> dict: imeta = {} return imeta + def _parallel( + self, + func: Callable, + names: List[str], + kwargs: Optional[dict] = None, + progressbar: Optional[bool] = True, + max_workers: Optional[int] = None, + chunksize: Optional[int] = None, + desc: str = "", + ): + """Parallel processing of function. + + Does not return results, so function must store results in database. + + Parameters + ---------- + func : function + function to apply in parallel + names : list + list of names to apply function to + progressbar : bool, optional + show progressbar, by default True + max_workers : int, optional + maximum number of workers, by default None + chunksize : int, optional + chunksize for parallel processing, by default None + desc : str, optional + description for progressbar, by default "" + """ + max_workers, chunksize = ConnectorUtil._get_max_workers_and_chunksize( + max_workers, len(names), chunksize + ) + + if kwargs is None: + kwargs = {} + + if progressbar: + return process_map( + partial(func, **kwargs), + names, + max_workers=max_workers, + chunksize=chunksize, + desc=desc, + total=len(names), + ) + else: + with ProcessPoolExecutor(max_workers=max_workers) as executor: + result = executor.map( + partial(func, **kwargs), names, chunksize=chunksize + ) + return result + @property def oseries_names(self): """List of oseries names.""" diff --git a/pastastore/extensions/hpd.py b/pastastore/extensions/hpd.py index ddd71bdd..d926463d 100644 --- a/pastastore/extensions/hpd.py +++ b/pastastore/extensions/hpd.py @@ -259,7 +259,7 @@ def download_knmi_precipitation( meteo_var: str = "RD", tmin: TimeType = None, tmax: TimeType = None, - unit_multiplier: float = 1e3, + unit_multiplier: float = 1e-3, fill_missing_obs: bool = True, normalize_datetime_index: bool = True, **kwargs, @@ -298,7 +298,7 @@ def download_knmi_evaporation( meteo_var: str = "EV24", tmin: TimeType = None, tmax: TimeType = None, - unit_multiplier: float = 1e3, + unit_multiplier: float = 1e-3, fill_missing_obs: bool = True, normalize_datetime_index: bool = True, **kwargs, diff --git a/pastastore/store.py b/pastastore/store.py index b2a29547..5fbd1448 100644 --- a/pastastore/store.py +++ b/pastastore/store.py @@ -4,9 +4,8 @@ import logging import os import warnings -from concurrent.futures import ProcessPoolExecutor from functools import partial -from typing import Dict, List, Literal, Optional, Tuple, Union +from typing import Dict, Iterable, List, Literal, Optional, Tuple, Union import numpy as np import pandas as pd @@ -14,7 +13,6 @@ from packaging.version import parse as parse_version from pastas.io.pas import pastas_hook from tqdm.auto import tqdm -from tqdm.contrib.concurrent import process_map from pastastore.base import BaseConnector from pastastore.connectors import DictConnector @@ -624,8 +622,10 @@ def get_statistics( self, statistics: Union[str, List[str]], modelnames: Optional[List[str]] = None, + parallel: bool = False, progressbar: Optional[bool] = False, ignore_errors: Optional[bool] = False, + fancy_output: bool = True, **kwargs, ) -> FrameorSeriesUnion: """Get model statistics. @@ -643,6 +643,11 @@ def get_statistics( ignore_errors : bool, optional ignore errors when True, i.e. when trying to calculate statistics for non-existent model in modelnames, default is False + parallel : bool, optional + use parallel processing, by default False + fancy_output : bool, optional + only read if parallel=True, if True, return as DataFrame with statistics, + otherwise return list of results **kwargs any arguments that can be passed to the methods for calculating statistics @@ -657,25 +662,39 @@ def get_statistics( if isinstance(statistics, str): statistics = [statistics] - # create dataframe for results - s = pd.DataFrame(index=modelnames, columns=statistics, data=np.nan) - - # loop through model names - desc = "Get model statistics" - for mlname in tqdm(modelnames, desc=desc) if progressbar else modelnames: - try: - ml = self.get_models(mlname, progressbar=False) - except Exception as e: - if ignore_errors: - continue - else: - raise e - for stat in statistics: - value = ml.stats.__getattribute__(stat)(**kwargs) - s.loc[mlname, stat] = value + if parallel: + kwargs["statistics"] = statistics + if self.conn.conn_type == "pas": + kwargs["connector"] = self.conn + return self.apply( + "models", + self.conn._get_statistics, + modelnames, + kwargs=kwargs, + parallel=parallel, + progressbar=progressbar, + fancy_output=fancy_output, + ).T # transpose to match serial output + else: + # create dataframe for results + s = pd.DataFrame(index=modelnames, columns=statistics, data=np.nan) + + # loop through model names + desc = "Get model statistics" + for mlname in tqdm(modelnames, desc=desc) if progressbar else modelnames: + try: + ml = self.get_models(mlname, progressbar=False) + except Exception as e: + if ignore_errors: + continue + else: + raise e + for stat in statistics: + value = getattr(ml.stats, stat)(**kwargs) + s.loc[mlname, stat] = value - s = s.squeeze() - return s.astype(float) + s = s.squeeze() + return s.astype(float) def create_model( self, @@ -1235,74 +1254,58 @@ def solve_models( modelnames = self.conn._parse_names(modelnames, libname="models") - solve_model = partial( - self._solve_model, - report=report, - ignore_solve_errors=ignore_solve_errors, - **kwargs, - ) - if self.conn.conn_type != "pas": + # prepare parallel + if parallel and self.conn.conn_type == "dict": parallel = False logger.error( - "Parallel solving only supported for PasConnector databases." - "Setting parallel to `False`" + "Parallel solving only supported for PasConnector and " + "ArcticDBConnector databases. Setting parallel to `False`" ) - - if parallel and progressbar: - process_map(solve_model, modelnames, max_workers=max_workers) - elif parallel and not progressbar: - with ProcessPoolExecutor(max_workers=max_workers) as executor: - executor.map(solve_model, modelnames) + if parallel: + if self.conn.conn_type == "arcticdb": + solve_model = partial( + self.conn._solve_model, + report=report, + ignore_solve_errors=ignore_solve_errors, + **kwargs, + ) + self.conn._parallel( + solve_model, + modelnames, + max_workers=max_workers, + chunksize=None, + progressbar=progressbar, + desc="Solving models (parallel)", + ) + elif self.conn.conn_type == "pas": + solve_model = partial( + self.conn._solve_model, + connector=self.conn, + report=report, + ignore_solve_errors=ignore_solve_errors, + **kwargs, + ) + self.conn._parallel( + solve_model, + modelnames, + max_workers=max_workers, + chunksize=None, + progressbar=progressbar, + desc="Solving models (parallel)", + ) else: + solve_model = partial( + self.conn._solve_model, + connector=self.conn, + report=report, + ignore_solve_errors=ignore_solve_errors, + **kwargs, + ) for ml_name in ( tqdm(modelnames, desc="Solving models") if progressbar else modelnames ): solve_model(ml_name=ml_name) - def _solve_model( - self, - ml_name: str, - report: bool = False, - ignore_solve_errors: bool = False, - **kwargs, - ) -> None: - """Solve a model in the store (internal method). - - ml_name : list of str, optional - name of a model in the pastastore - report : boolean, optional - determines if a report is printed when the model is solved, - default is False - ignore_solve_errors : boolean, optional - if True, errors emerging from the solve method are ignored, - default is False which will raise an exception when a model - cannot be optimized - **kwargs : dictionary - arguments are passed to the solve method. - """ - ml = self.conn.get_models(ml_name) - m_kwargs = {} - for key, value in kwargs.items(): - if isinstance(value, pd.Series): - m_kwargs[key] = value.loc[ml.name] - else: - m_kwargs[key] = value - # Convert timestamps - for tstamp in ["tmin", "tmax"]: - if tstamp in m_kwargs: - m_kwargs[tstamp] = pd.Timestamp(m_kwargs[tstamp]) - - try: - ml.solve(report=report, **m_kwargs) - except Exception as e: - if ignore_solve_errors: - warning = "Solve error ignored for '%s': %s " % (ml.name, e) - logger.warning(warning) - else: - raise e - - self.conn.add_model(ml, overwrite=True) - def model_results( self, mls: Optional[Union[ps.Model, list, str]] = None, @@ -1443,6 +1446,7 @@ def from_zip( conn: Optional[BaseConnector] = None, storename: Optional[str] = None, progressbar: bool = True, + series_ext_json: bool = False, ): """Load PastaStore from zipfile. @@ -1458,6 +1462,10 @@ def from_zip( defaults to the name of the Connector. progressbar : bool, optional show progressbar, by default True + series_ext_json : bool, optional + if True, series are expected to have a .json extension, by default False, + which assumes a .pas extension. Set this option to true for reading + zipfiles created with older versions of pastastore <1.8.0. Returns ------- @@ -1469,9 +1477,22 @@ def from_zip( if conn is None: conn = DictConnector("pastas_db") + if series_ext_json: + ext = "json" + else: + ext = "pas" + + # short circuit for PasConnector when zipfile was written using pas files + if conn.conn_type == "pas" and not series_ext_json: + with ZipFile(fname, "r") as archive: + archive.extractall(conn.path) + if storename is None: + storename = conn.name + return cls(conn, storename) + with ZipFile(fname, "r") as archive: namelist = [ - fi for fi in archive.namelist() if not fi.endswith("_meta.json") + fi for fi in archive.namelist() if not fi.endswith(f"_meta.{ext}") ] for f in tqdm(namelist, desc="Reading zip") if progressbar else namelist: libname, fjson = os.path.split(f) @@ -1480,7 +1501,7 @@ def from_zip( if not isinstance(s.index, pd.DatetimeIndex): s.index = pd.to_datetime(s.index, unit="ms") s = s.sort_index() - meta = json.load(archive.open(f.replace(".json", "_meta.json"))) + meta = json.load(archive.open(f.replace(f".{ext}", f"_meta.{ext}"))) conn._add_series(libname, s, fjson.split(".")[0], metadata=meta) elif libname in ["models"]: ml = json.load(archive.open(f), object_hook=pastas_hook) @@ -1496,7 +1517,7 @@ def search( case_sensitive: bool = True, sort=True, ): - """Search for names of time series or models starting with `s`. + """Search for names of time series or models containing string `s`. Parameters ---------- @@ -1515,30 +1536,45 @@ def search( list of names that match search result """ if libname == "models": - lib_names = self.model_names + lib_names = {"models": self.model_names} elif libname == "stresses": - lib_names = self.stresses_names + lib_names = {"stresses": self.stresses_names} elif libname == "oseries": - lib_names = self.oseries_names + lib_names = {"oseries": self.oseries_names} + elif libname is None: + lib_names = { + "oseries": self.oseries_names, + "stresses": self.stresses_names, + "models": self.model_names, + } else: raise ValueError("Provide valid libname: 'models', 'stresses' or 'oseries'") - if isinstance(s, str): - if case_sensitive: - matches = [n for n in lib_names if s in n] - else: - matches = [n for n in lib_names if s.lower() in n.lower()] - if isinstance(s, list): - m = np.array([]) - for sub in s: + result = {} + for lib, names in lib_names.items(): + if isinstance(s, str): if case_sensitive: - m = np.append(m, [n for n in lib_names if sub in n]) + matches = [n for n in names if s in n] else: - m = np.append(m, [n for n in lib_names if sub.lower() in n.lower()]) - matches = list(np.unique(m)) - if sort: - matches.sort() - return matches + matches = [n for n in names if s.lower() in n.lower()] + elif isinstance(s, list): + m = np.array([]) + for sub in s: + if case_sensitive: + m = np.append(m, [n for n in names if sub in n]) + else: + m = np.append(m, [n for n in names if sub.lower() in n.lower()]) + matches = list(np.unique(m)) + else: + raise TypeError("s must be str or list of str!") + if sort: + matches.sort() + result[lib] = matches + + if len(result) == 1: + return result[lib] + else: + return result def get_model_timeseries_names( self, @@ -1603,7 +1639,17 @@ def get_model_timeseries_names( else: return structure - def apply(self, libname, func, names=None, progressbar=True): + def apply( + self, + libname: str, + func: callable, + names: Optional[Union[str, List[str]]] = None, + kwargs: Optional[dict] = None, + progressbar: bool = True, + parallel: bool = False, + max_workers: Optional[int] = None, + fancy_output: bool = True, + ) -> Union[dict, pd.Series, pd.DataFrame]: """Apply function to items in library. Supported libraries are oseries, stresses, and models. @@ -1613,32 +1659,114 @@ def apply(self, libname, func, names=None, progressbar=True): libname : str library name, supports "oseries", "stresses" and "models" func : callable - function that accepts items from one of the supported libraries as input + function that accepts a string corresponding to the name of an item in + the library as its first argument. Additional keyword arguments can be + specified. The function can return any result, or update an item in the + database without returning anything. names : str, list of str, optional apply function to these names, by default None which loops over all stored items in library + kwargs : dict, optional + keyword arguments to pass to func, by default None progressbar : bool, optional show progressbar, by default True + parallel : bool, optional + run apply in parallel, default is False. + max_workers : int, optional + max no. of workers, only used if parallel is True + fancy_output : bool, optional + if True, try returning result as pandas Series or DataFrame, by default + False Returns ------- dict dict of results of func, with names as keys and results as values + + Notes + ----- + Users should be aware that parallel solving is platform dependent + and may not always work. The current implementation works well for Linux users. + For Windows users, parallel solving does not work when called directly from + Jupyter Notebooks or IPython. To use parallel solving on Windows, the following + code should be used in a Python file:: + + from multiprocessing import freeze_support + + if __name__ == "__main__": + freeze_support() + pstore.apply("models", some_func, parallel=True) """ names = self.conn._parse_names(names, libname) - result = {} + if kwargs is None: + kwargs = {} if libname not in ("oseries", "stresses", "models"): raise ValueError( "'libname' must be one of ['oseries', 'stresses', 'models']!" ) - getter = getattr(self.conn, f"get_{libname}") - for n in ( - tqdm(names, desc=f"Applying {func.__name__}") if progressbar else names - ): - result[n] = func(getter(n)) - return result + if parallel: + result = self.conn._parallel( + func, + kwargs=kwargs, + names=names, + progressbar=progressbar, + max_workers=max_workers, + chunksize=None, + desc=f"Applying {func.__name__} (parallel)", + ) + else: + result = [] + for n in tqdm( + names, desc=f"Applying {func.__name__}", disable=not progressbar + ): + result.append(func(n, **kwargs)) + if fancy_output: + return PastaStore._fancy_output(result, names, func.__name__) + else: + return result + + @staticmethod + def _fancy_output( + result: Iterable, + names: List[str], + label: Optional[str] = None, + ) -> Union[pd.Series, pd.DataFrame, dict]: + """Convert apply result to pandas Series, DataFrame or dict. - def within(self, extent, names=None, libname="oseries"): + Parameters + ---------- + result : Iterable + result of apply function + names : list + list of names + label : str, optional + label for columns, by default None + + Returns + ------- + pd.Series, pd.DataFrame, dict + Series, DataFrame or dict with results + """ + if not isinstance(result, list): + result = list(result) + if isinstance(result[0], (float, int, np.integer)): + return pd.Series(result, index=names) + elif isinstance(result[0], (pd.Series, pd.DataFrame)): + df = pd.concat(dict(zip(names, result, strict=True)), axis=1) + if label is not None: + df.columns.name = label + return df + elif result[0] is None: + return None # return None if first result is None? + else: + return dict(zip(names, result, strict=True)) + + def within( + self, + extent: list, + names: Optional[list[str]] = None, + libname: str = "oseries", + ): """Get names of items within extent. Parameters diff --git a/pastastore/util.py b/pastastore/util.py index dda51c47..991c1002 100644 --- a/pastastore/util.py +++ b/pastastore/util.py @@ -385,7 +385,7 @@ def copy_database( conn2, libraries: Optional[List[str]] = None, overwrite: bool = False, - progressbar: bool = False, + progressbar: bool = True, ) -> None: """Copy libraries from one database to another. diff --git a/pastastore/version.py b/pastastore/version.py index 8c1822dc..908a26e9 100644 --- a/pastastore/version.py +++ b/pastastore/version.py @@ -9,7 +9,7 @@ PASTAS_LEQ_022 = PASTAS_VERSION <= parse_version("0.22.0") PASTAS_GEQ_150 = PASTAS_VERSION >= parse_version("1.5.0") -__version__ = "1.7.2" +__version__ = "1.8.0" def show_versions(optional=False) -> None: diff --git a/pyproject.toml b/pyproject.toml index ec2b645b..94da899b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,7 @@ repository = "https://github.com/pastas/pastastore" documentation = "https://pastastore.readthedocs.io/en/latest/" [project.optional-dependencies] -full = ["pastastore[arcticdb,optional]"] +full = ["pastastore[arcticdb,optional]", "hydropandas"] extensions = ["hydropandas"] optional = ["contextily", "pyproj", "adjustText"] arcticdb = ["arcticdb"] @@ -61,7 +61,7 @@ test = [ "codacy-coverage", ] test_py312 = [ - "pastastore[lint,optional]", # no arcticdb + "pastastore[lint,optional]", # no arcticdb "hydropandas[full]", "coverage", "codecov", diff --git a/tests/data/test_hpd_update.zip b/tests/data/test_hpd_update.zip index 587c2f9a..2a5a7c0f 100644 Binary files a/tests/data/test_hpd_update.zip and b/tests/data/test_hpd_update.zip differ diff --git a/tests/test_003_pastastore.py b/tests/test_003_pastastore.py index 8021b500..b2b2493d 100644 --- a/tests/test_003_pastastore.py +++ b/tests/test_003_pastastore.py @@ -210,7 +210,8 @@ def test_solve_models_parallel(request, pstore): def test_apply(request, pstore): depends(request, [f"test_solve_models_and_get_stats[{pstore.type}]"]) - def func(ml): + def func(ml_name): + ml = pstore.conn.get_models(ml_name) return ml.parameters.loc["recharge_A", "optimal"] result = pstore.apply("models", func) diff --git a/tests/test_007_hpdextension.py b/tests/test_007_hpdextension.py index 843bdbb7..b72fd1a9 100644 --- a/tests/test_007_hpdextension.py +++ b/tests/test_007_hpdextension.py @@ -25,7 +25,7 @@ def test_hpd_download_precipitation_from_knmi(): activate_hydropandas_extension() pstore = pst.PastaStore() pstore.hpd.download_knmi_precipitation( - stns=[260], tmin="2022-01-01", tmax="2022-01-31" + stns=[260], meteo_var="RH", tmin="2022-01-01", tmax="2022-01-31" ) assert pstore.n_stresses == 1 @@ -50,10 +50,10 @@ def test_update_oseries(): activate_hydropandas_extension() pstore = pst.PastaStore.from_zip("tests/data/test_hpd_update.zip") - pstore.hpd.update_bro_gmw(tmax="2024-01-31") + pstore.hpd.update_bro_gmw(tmax="2022-02-28") tmintmax = pstore.get_tmin_tmax("oseries") - assert tmintmax.loc["GMW000000036319_1", "tmax"] >= Timestamp("2024-01-30") - assert tmintmax.loc["GMW000000036327_1", "tmax"] >= Timestamp("2024-01-20") + assert tmintmax.loc["GMW000000036319_1", "tmax"] >= Timestamp("2022-02-27") + assert tmintmax.loc["GMW000000036327_1", "tmax"] >= Timestamp("2022-02-27") @pytest.mark.xfail(reason="KNMI is being flaky, so allow this test to xfail/xpass.") @@ -64,9 +64,9 @@ def test_update_stresses(): activate_hydropandas_extension() pstore = pst.PastaStore.from_zip("tests/data/test_hpd_update.zip") - pstore.hpd.update_knmi_meteo(tmax="2024-01-31", normalize_datetime_index=False) + pstore.hpd.update_knmi_meteo(tmax="2022-02-28", normalize_datetime_index=True) tmintmax = pstore.get_tmin_tmax("stresses") - assert (tmintmax["tmax"] >= Timestamp("2024-01-31")).all() + assert (tmintmax["tmax"] >= Timestamp("2024-02-27")).all() @pytest.mark.xfail(reason="KNMI is being flaky, so allow this test to xfail/xpass.") @@ -78,8 +78,10 @@ def test_nearest_stresses(): pstore = pst.PastaStore.from_zip("tests/data/test_hpd_update.zip") pstore.hpd.download_nearest_knmi_precipitation( - "GMW000000036319_1", tmin="2024-01-01" + "GMW000000036319_1", tmin="2024-01-01", tmax="2024-01-31" ) assert "RD_GROOT-AMMERS" in pstore.stresses_names - pstore.hpd.download_nearest_knmi_evaporation("GMW000000036319_1", tmin="2024-01-01") + pstore.hpd.download_nearest_knmi_evaporation( + "GMW000000036319_1", tmin="2024-01-01", tmax="2024-01-31" + ) assert "EV24_CABAUW-MAST" in pstore.stresses_names