From 84c3ac361e898cc4e59f6b1a03ab38e107e835ce Mon Sep 17 00:00:00 2001 From: Wolfgang Preimesberger Date: Tue, 29 Aug 2023 21:49:10 +0200 Subject: [PATCH 1/8] Fine tune logging options --- src/repurpose/process.py | 85 +++++++++++++++++++++++----------------- 1 file changed, 50 insertions(+), 35 deletions(-) diff --git a/src/repurpose/process.py b/src/repurpose/process.py index f93e24e..3e10eb9 100644 --- a/src/repurpose/process.py +++ b/src/repurpose/process.py @@ -52,9 +52,11 @@ def parallel_process_async( n_proc=1, show_progress_bars=True, ignore_errors=False, + activate_logging=True, log_path=None, loglevel="WARNING", verbose=False, + progress_bar_label="Processed" ): """ Applies the passed function to all elements of the passed iterables. @@ -83,6 +85,11 @@ def parallel_process_async( this case the return values are kept in order. show_progress_bars: bool, optional (default: True) Show how many iterables were processed already. + ignore_errors: bool, optional (default: False) + If True, exceptions are caught and logged. If False, exceptions are + raised. + activate_logging: bool, optional (default: True) + If False, no logging is done at all (neither to file nor to stdout). log_path: str, optional (default: None) If provided, a log file is created in the passed directory. loglevel: str, optional (default: "WARNING") @@ -90,40 +97,46 @@ def parallel_process_async( ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]. verbose: float, optional (default: False) Print all logging messages to stdout, useful for debugging. + progress_bar_label: str, optional (default: "Processed") + Label to use for the progress bar. Returns ------- results: list List of return values from each function call """ - logger = logging.getLogger() - streamHandler = logging.StreamHandler(sys.stdout) - formatter = logging.Formatter( - '%(asctime)s - %(name)s - %(levelname)s - %(message)s') - streamHandler.setFormatter(formatter) - - if STATIC_KWARGS is None: - STATIC_KWARGS = dict() - - if verbose: - logger.setLevel('DEBUG') - logger.addHandler(streamHandler) - - if log_path is not None: - log_file = os.path.join( - log_path, - f"{FUNC.__name__}_{datetime.now().strftime('%Y%m%d%H%M')}.log") + if activate_logging: + logger = logging.getLogger() + streamHandler = logging.StreamHandler(sys.stdout) + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s') + streamHandler.setFormatter(formatter) + + if STATIC_KWARGS is None: + STATIC_KWARGS = dict() + + if verbose: + logger.setLevel('DEBUG') + logger.addHandler(streamHandler) + + if log_path is not None: + log_file = os.path.join( + log_path, + f"{FUNC.__name__}_{datetime.now().strftime('%Y%m%d%H%M')}.log") + else: + log_file = None + + + if log_file: + os.makedirs(os.path.dirname(log_file), exist_ok=True) + logging.basicConfig( + filename=log_file, + level=loglevel.upper(), + format="%(levelname)s %(asctime)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) else: - log_file = None - - if log_file: - os.makedirs(os.path.dirname(log_file), exist_ok=True) - logging.basicConfig( - filename=log_file, - level=loglevel.upper(), - format="%(levelname)s %(asctime)s %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - ) + logger = None n = np.array([len(v) for k, v in ITER_KWARGS.items()]) if len(n) == 0: @@ -150,7 +163,7 @@ def parallel_process_async( process_kwargs.append(kws) if show_progress_bars: - pbar = tqdm(total=len(process_kwargs), desc=f"Processed") + pbar = tqdm(total=len(process_kwargs), desc=progress_bar_label) else: pbar = None @@ -163,7 +176,8 @@ def update(r) -> None: pbar.update() def error(e) -> None: - logging.error(e) + if logger is not None: + logging.error(e) if not ignore_errors: raise e if pbar is not None: @@ -191,12 +205,13 @@ def error(e) -> None: if pbar is not None: pbar.close() - if verbose: - logger.handlers.clear() + if logger is not None: + if verbose: + logger.handlers.clear() - handlers = logger.handlers[:] - for handler in handlers: - logger.removeHandler(handler) - handler.close() + handlers = logger.handlers[:] + for handler in handlers: + logger.removeHandler(handler) + handler.close() return results From 3ab34968764b29e83b30f1b632cb303168e23215 Mon Sep 17 00:00:00 2001 From: Wolfgang Preimesberger Date: Thu, 14 Dec 2023 10:35:57 +0100 Subject: [PATCH 2/8] Add pre and post process option to ts2img --- src/repurpose/ts2img.py | 74 +++++++++++++++++++++++++++++++++++------ tests/test_ts2img.py | 55 ++++++++++++++++++++++++++++-- 2 files changed, 116 insertions(+), 13 deletions(-) diff --git a/src/repurpose/ts2img.py b/src/repurpose/ts2img.py index 9259222..4e13462 100644 --- a/src/repurpose/ts2img.py +++ b/src/repurpose/ts2img.py @@ -21,8 +21,6 @@ - add possibility to use resampling methods other than nearest neighbour - integrate repurpose.resample module - allows weighting functions etc. -- add preprocessing and postprocessing keywords to change the input ts and - output stack before writing - similar to resample, use multiple neighbours when available for image pixel - further harmonisation with pynetcf interface - time ranges for images instead of time stamps @@ -32,7 +30,9 @@ def _convert(converter: 'Ts2Img', writer: Regular3dimImageStack, img_gpis: np.ndarray, lons: np.ndarray, - lats: np.ndarray) -> xr.Dataset: + lats: np.ndarray, + preprocess_func=None, + preprocess_kwargs=None) -> xr.Dataset: """ Wrapper to allow parallelization of the conversion process. This is kept outside the Ts2Img class for parallelization. @@ -41,6 +41,9 @@ def _convert(converter: 'Ts2Img', ts = converter._read_nn(lon, lat) if ts is None: continue + if preprocess_func is not None: + preprocess_kwargs = preprocess_kwargs or {} + ts = preprocess_func(ts, **preprocess_kwargs) if np.any(np.isin(ts.columns, Ts2Img._protected_vars)): raise ValueError( f"Time series contains protected variables. " @@ -199,7 +202,8 @@ def _read_nn(self, lon: float, lat: float) -> Union[pd.DataFrame, None]: ts = ts.rename(columns=self.variables)[self.variables.values()] return ts - def _calc_chunk(self, timestamps, log_path=None, n_proc=1): + def _calc_chunk(self, timestamps, preprocess_func=None, preprocess_kwargs=None, + log_path=None, n_proc=1): """ Create image stack from time series for the passed timestamps. See: self.calc @@ -209,7 +213,11 @@ def _calc_chunk(self, timestamps, log_path=None, n_proc=1): f"{timestamps[-1]}") # Transfer time series to images, parallel for cells - STATIC_KWARGS = {'converter': self} + STATIC_KWARGS = { + 'converter': self, + 'preprocess_func': preprocess_func, + 'preprocess_kwargs': preprocess_kwargs, + } ITER_KWARGS = {'writer': [], 'img_gpis': [], 'lons': [], 'lats': []} for cell in np.unique(self.img_grid.activearrcell): @@ -232,10 +240,12 @@ def _calc_chunk(self, timestamps, log_path=None, n_proc=1): lon=stack['lon'])) return stack - def calc(self, path_out, format_out='slice', - fn_template="{datetime}.nc", drop_empty=False, encoding=None, - zlib=True, glob_attrs=None, var_attrs=None, - var_fillvalues=None, var_dtypes=None, img_buffer=100, n_proc=1): + def calc(self, path_out, format_out='slice', preprocess=None, + preprocess_kwargs=None, postprocess=None, postprocess_kwargs=None, + fn_template="{datetime}.nc", + drop_empty=False, encoding=None, zlib=True, glob_attrs=None, + var_attrs=None, var_fillvalues=None, var_dtypes=None, + img_buffer=100, n_proc=1): """ Perform conversion of all time series to images. This will first split timestamps into processing chunks (img_buffer) and then - for each @@ -253,6 +263,43 @@ def calc(self, path_out, format_out='slice', - stack: write all time steps into one file. In this case if there is a {datetime} placeholder in the fn_template, then the time range is inserted. + preprocess: callable, optional (default: None) + Function that is applied to each time series before converting it. + The first argument is the data frame that the reader returns. + Additional keyword arguments can be passed via `preprocess_kwargs`. + The function must return a data frame of the same form as the input + data, i.e. with a datetime index and at least one column of data. + Note: As an alternative to a preprocessing function, consider + applying an adapter to the reader class. Adapters also perform + preprocessing, see `pytesmo.validation_framework.adapters` + A simple example for a preprocessing function to compute the sum: + ``` + def preprocess_add(df: pd.DataFrame, **preprocess_kwargs) \ + -> pd.DataFrame: + df['var3'] = df['var1'] + df['var2'] + return df + ``` + preprocess_kwargs: dict, optional (default: None) + Keyword arguments for the preprocess function. If None are given, + then the preprocessing function is is called with only the input + data frame and no additional arguments (see example above). + postprocess: Callable, optional (default: None) + Function that is applied to the image stack after loading the data + and before writing it to disk. The function must take xarray + Dataset as the first argument and return an xarray Dataset of the + same form as the input data. + A simple example for a preprocessing function to add a new variable + from the sum of two existing variables: + ``` + def preprocess_add(stack: xr.Dataset, **postprocess_kwargs) \ + -> xr.Dataset + stack = stack.assign(var3=lambda x: x['var0'] + x['var2']) + return stack + ``` + postprocess_kwargs: dict, optional (default: None) + Keyword arguments for the postprocess function. If None are given, + then the postprocess function is called with only the input + image stack and no additional arguments (see example above). fn_template: str, optional (default: "{datetime}.nc") Template for the output image file names. If format_out is 'slice', then a placeholder {datetime} must be @@ -298,6 +345,7 @@ def calc(self, path_out, format_out='slice', img_buffer: int, optional (default: 100) Size of the stack before writing to disk. Larger stacks need more memory but will lead to faster conversion. + Passing -1 means that the whole stack loaded into memory at once. n_proc: int, optional (default: 1) Number of processes to use for parallel processing. We parallelize by 5 deg. grid cell. @@ -315,7 +363,9 @@ def calc(self, path_out, format_out='slice', dt_index_chunks = list(idx_chunks(self.timestamps, int(img_buffer))) for timestamps in dt_index_chunks: - self.stack = self._calc_chunk(timestamps, log_path, n_proc) + self.stack = self._calc_chunk(timestamps, + preprocess, preprocess_kwargs, + log_path, n_proc) if drop_empty: vars = [var for var in self.stack.data_vars if var not in @@ -331,6 +381,10 @@ def calc(self, path_out, format_out='slice', self.stack = self.stack.drop_isel(time=idx_empty) + if postprocess is not None: + postprocess_kwargs = postprocess_kwargs or {} + self.stack = postprocess(self.stack, **postprocess_kwargs) + if var_fillvalues is not None: for var, fillvalue in var_fillvalues.items(): self.stack[var].values = np.nan_to_num( diff --git a/tests/test_ts2img.py b/tests/test_ts2img.py index 7e87775..c801d23 100644 --- a/tests/test_ts2img.py +++ b/tests/test_ts2img.py @@ -71,6 +71,19 @@ def read(self, lon: float, lat: float): def test_ts2img_time_collocation_integration(): + def preprocess_func(df, mult=2): + # This dummy function just adds a new column to the dataframe after + # reading + df['var3'] = df['var1'] * mult + return df + + def postprocess_func(stack, vars, fillvalue=0): + # This dummy function just fills nans with an actual value before + # writing the stack + for var in vars: + stack[var].values = np.nan_to_num(stack[var].values, nan=fillvalue) + return stack + timestamps_image = pd.date_range('2020-07-01', '2020-07-31', freq='6H') timestamps_ts = timestamps_image[20:50] @@ -93,6 +106,8 @@ def test_ts2img_time_collocation_integration(): converter.calc( path_out, format_out='slice', fn_template="test_{datetime}.nc", drop_empty=True, + preprocess=preprocess_func, preprocess_kwargs={'mult': 2}, + postprocess=postprocess_func, postprocess_kwargs={'vars': ('var2',)}, encoding={'var1': {'dtype': 'int64', 'scale_factor': 0.0000001, '_FillValue': -9999}, }, var_attrs={'var1': {'long_name': 'test_var1', 'units': 'm'}}, @@ -116,16 +131,21 @@ def test_ts2img_time_collocation_integration(): ds = xr.open_dataset( os.path.join(path_out, '2020', 'test_20200708060000.nc')) assert list(ds.dims) == ['lon', 'lat', 'time'] - assert ds.data_vars.keys() == {'timedelta_seconds', 'var1', 'var2'} + assert ds.data_vars.keys() == {'timedelta_seconds', 'var1', 'var2', 'var3'} # var1 was stored as int, but is float64 after decoding assert ds['var1'].values.dtype == 'float64' assert ds['var1'].values.shape == (1, 10, 8) assert ds['var1'].encoding['scale_factor'] == 0.0000001 + # during preprocessing var3 was added as var1 * 2 + np.testing.assert_almost_equal(ds['var3'].values, + ds['var1'].values * 2) assert 1 > np.nanmin(ds['var1'].values) > 0 assert np.isnan(ds['var1'].values[-1, -1, -1]) np.testing.assert_almost_equal(ds['var1'].values[0, 0, 0], 0.7620138, 5) + # check if the postprocess function was applied + assert np.count_nonzero(np.isnan(ds['var2'].values)) == 0 t = pd.to_datetime(ds.time.values[0]).to_pydatetime() t = t + timedelta(seconds=int( @@ -148,6 +168,19 @@ def test_ts2img_time_collocation_integration(): def test_ts2img_no_collocation_integration(): + def preprocess_func(df, **kwargs): + df.replace(-9999, np.nan, inplace=True) + df = df.reindex(pd.date_range('2020-07-01', '2020-07-10', freq='1D')) + df = df.resample('1D').mean() + df['var3'] = np.nan + df.loc['2020-07-10', 'var3'] = 1 + df.loc['2020-07-09', 'var3'] = 2 + return df + + def postprocess_func(stack, **kwargs): + stack = stack.assign(var4=lambda x: x['var3'] ** 2) + return stack + timestamps_image = pd.date_range('2020-07-01', '2020-07-10', freq='1D') timestamps_ts = timestamps_image[1:] @@ -168,6 +201,7 @@ def test_ts2img_no_collocation_integration(): with tempfile.TemporaryDirectory() as path_out: converter.calc(path_out, format_out='slice', + preprocess=preprocess_func, postprocess=postprocess_func, fn_template="test_{datetime}.nc", drop_empty=False, encoding={'var2': {'dtype': 'int16'}, }, var_attrs={ @@ -183,16 +217,30 @@ def test_ts2img_no_collocation_integration(): os.path.join(path_out, '2020', 'test_20200701000000.nc')) ds2 = xr.open_dataset( os.path.join(path_out, '2020', 'test_20200704000000.nc')) - for var in ds.data_vars: + ds3 = xr.open_dataset( + os.path.join(path_out, '2020', 'test_20200710000000.nc')) + ds4 = xr.open_dataset( + os.path.join(path_out, '2020', 'test_20200709000000.nc')) + assert np.nanmax(ds3['var3'].values) == 1 + assert np.nanmax(ds4['var3'].values) == 2 + for var in ['var0', 'var2']: assert np.all(np.nan_to_num(ds[var].values, nan=-1) == np.nan_to_num(ds2[var].values, nan=-1)) + # check if the postprocessing function was applied + assert np.nanmax(ds3['var4'].values) == 1 + assert np.nanmax(ds4['var4'].values) == 4 + assert 4 in np.unique(ds4['var4'].values) + assert 1 in np.unique(ds3['var4'].values) + assert len(np.unique(ds3['var4'].values)) == \ + len(np.unique(ds4['var4'].values)) == 2 + ds2.close() # needed on windows! assert list(ds.dims) == ['lon', 'lat', 'time'] assert 'timedelta_seconds' not in ds.data_vars.keys() assert np.all(np.isnan(ds['var0'].values)) assert np.all(ds['var2'].values == -9999) - assert ds.data_vars.keys() == {'var0', 'var2'} + assert ds.data_vars.keys() == {'var0', 'var2', 'var3', 'var4'} ds = xr.open_dataset( os.path.join(path_out, '2020', 'test_20200702000000.nc')) @@ -224,3 +272,4 @@ def test_ts2img_no_collocation_integration(): assert val == int(val_ts['var2']) ds.close() # needed on Windows! + From bafdcab216aabea46477fd1195f100929d4c6680 Mon Sep 17 00:00:00 2001 From: Wolfgang Preimesberger Date: Thu, 14 Dec 2023 11:47:33 +0100 Subject: [PATCH 3/8] Update CI build --- .github/workflows/build.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 191c392..a2b66ee 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -19,17 +19,17 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - python-version: ['3.8', '3.9', '3.10'] + python-version: ['3.8', '3.9', '3.10', '3.11'] os: ["ubuntu-latest"] include: - os: "windows-latest" - python-version: '3.10' + python-version: '3.11' steps: - uses: actions/checkout@v2 with: submodules: true fetch-depth: 0 - - uses: conda-incubator/setup-miniconda@v2 + - uses: conda-incubator/setup-miniconda@v3 with: miniconda-version: "latest" auto-update-conda: true From d2b4d635e7bf115a5751e4f20c9dd59c1eae8e00 Mon Sep 17 00:00:00 2001 From: Wolfgang Preimesberger Date: Thu, 14 Dec 2023 12:05:58 +0100 Subject: [PATCH 4/8] Update build --- .github/workflows/build.yml | 2 +- environment.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index a2b66ee..b5e7c95 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -25,7 +25,7 @@ jobs: - os: "windows-latest" python-version: '3.11' steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 with: submodules: true fetch-depth: 0 diff --git a/environment.yml b/environment.yml index aa73504..12c7f8e 100644 --- a/environment.yml +++ b/environment.yml @@ -16,7 +16,7 @@ dependencies: - xarray - pip # optional, for docs and testing - - nb_conda + #- nb_conda - matplotlib - ipykernel - pip: From 5f19e5adaa0daf44506df46d3a42c2141ededed0 Mon Sep 17 00:00:00 2001 From: Wolfgang Preimesberger Date: Thu, 14 Dec 2023 12:11:55 +0100 Subject: [PATCH 5/8] Update env --- environment.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/environment.yml b/environment.yml index 12c7f8e..23f0cfb 100644 --- a/environment.yml +++ b/environment.yml @@ -23,9 +23,11 @@ dependencies: - pygeogrids - pynetcf>=0.5.0 - more_itertools - - sphinx_rtd_theme - smecv_grid - tqdm + # Optional, for documentation and testing + - nbconvert + - sphinx_rtd_theme - yapf - pytest - pytest-cov From a27b485bda1b51f0914d4d015340aea4c5de3b5d Mon Sep 17 00:00:00 2001 From: Wolfgang Preimesberger Date: Thu, 14 Dec 2023 12:54:05 +0100 Subject: [PATCH 6/8] Fix test --- tests/test_image.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_image.py b/tests/test_image.py index 5e525e9..c734f4e 100644 --- a/tests/test_image.py +++ b/tests/test_image.py @@ -148,7 +148,7 @@ def test_add_3d_data_via_ts(self): warnings.simplefilter("ignore", category=pd.errors.PerformanceWarning) ts = pd.DataFrame( - index=self.img_timestamps + self.timeoffsets, + index=pd.DatetimeIndex(self.img_timestamps + self.timeoffsets), data={'var1': np.arange(1.1, 6.1).astype('float32'), 'var2': np.arange(11, 16).astype('int8')} ) From e69e9a2ad63e9aaca5301fd1a21ca1b74d6f60ec Mon Sep 17 00:00:00 2001 From: Wolfgang Preimesberger Date: Thu, 14 Dec 2023 13:05:53 +0100 Subject: [PATCH 7/8] Update readme --- CHANGELOG.rst | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index a98af35..b99828a 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,6 +7,14 @@ Unreleased changes in master branch - +Version 0.10 +============ + +- Ts2Img module was rebuilt. Allows conversion of time series with NN lookup. +- Added example notebook for converting ASCAT time series into regularly gridded images. +- Added a simple parallelization framework, with logging and error handling. +- Added the option to pass custom pre- and post-processing functions to ts2img. + Version 0.9 =========== From 0c94d8ccbd815a7edc6d07f6f616ed32bcbc0df0 Mon Sep 17 00:00:00 2001 From: Wolfgang Preimesberger Date: Thu, 14 Dec 2023 13:09:09 +0100 Subject: [PATCH 8/8] Update test --- tests/test_ts2img.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/test_ts2img.py b/tests/test_ts2img.py index c801d23..214ac6e 100644 --- a/tests/test_ts2img.py +++ b/tests/test_ts2img.py @@ -236,12 +236,17 @@ def postprocess_func(stack, **kwargs): len(np.unique(ds4['var4'].values)) == 2 ds2.close() # needed on windows! + ds3.close() + ds4.close() + assert list(ds.dims) == ['lon', 'lat', 'time'] assert 'timedelta_seconds' not in ds.data_vars.keys() assert np.all(np.isnan(ds['var0'].values)) assert np.all(ds['var2'].values == -9999) assert ds.data_vars.keys() == {'var0', 'var2', 'var3', 'var4'} + ds.close() + ds = xr.open_dataset( os.path.join(path_out, '2020', 'test_20200702000000.nc')) assert not np.all(np.isnan(ds['var0'].values))