From 40161042caf1d9373c66c5f7431e2af5f45df85d Mon Sep 17 00:00:00 2001 From: Wolfgang Preimesberger Date: Wed, 17 Jan 2024 17:23:05 +0100 Subject: [PATCH 1/2] Fix logging issue --- src/repurpose/img2ts.py | 17 +++++++---- src/repurpose/process.py | 65 +++++++++++++++++++++++++--------------- tests/test_img2ts.py | 2 -- 3 files changed, 53 insertions(+), 31 deletions(-) diff --git a/src/repurpose/img2ts.py b/src/repurpose/img2ts.py index 6f52891..6bfdfd9 100644 --- a/src/repurpose/img2ts.py +++ b/src/repurpose/img2ts.py @@ -118,7 +118,10 @@ class that uses the read_img iterator of the input_data dataset Default: True n_proc: int, optional (default: 1) Number of parallel processes. Multiprocessing is only used when - `n_proc` > 1. Applies to data reading and writing. + `n_proc` > 1. Applies to data reading and writing. Should be chosen + according to the file connection. A slow connection might be overloaded + by too many processes trying to read data (e.g. network). + If unsure, better leave this at 1. """ def __init__(self, @@ -193,6 +196,9 @@ def __init__(self, self.n_proc = n_proc + self.log_filename = \ + f"img2ts_{datetime.now().strftime('%Y%m%d%H%M')}.log" + def _read_image(self, date, target_grid): """ Function to parallelize reading image data from dataset. @@ -214,7 +220,6 @@ def _read_image(self, date, target_grid): orthogonal: bool Whether the image fits the orthogonal time series format or not. """ - # optional on-the-fly spatial resampling resample_kwargs = { 'methods': self.r_methods, @@ -383,8 +388,8 @@ def _write_non_orthogonal(self, dataout.add_global_attr( 'geospatial_lon_max', np.max(cell_lons)) - # for this dataset we have to loop through the gpis since each time series - # can be different in length + # for this dataset we have to loop through the gpis since each + # time series can be different in length for i, (gpi, gpi_lon, gpi_lat) in enumerate( zip(cell_gpis, cell_lons, cell_lats)): gpi_data = {} @@ -433,7 +438,7 @@ def calc(self): os.path.join(self.outputpath, self.gridname), self.target_grid) for img_stack_dict, timestamps in self.img_bulk(): - # ================================================================== + # ================================================================= start_time = datetime.now() # temporally drop grids, due to issue when pickling them... @@ -500,6 +505,7 @@ def calc(self): ITER_KWARGS=ITER_KWARGS, STATIC_KWARGS=STATIC_KWARGS, log_path=os.path.join(self.outputpath, '000_log'), + log_filename=self.log_filename, loglevel="INFO", ignore_errors=True, n_proc=self.n_proc, @@ -557,6 +563,7 @@ def img_bulk(self): STATIC_KWARGS={'target_grid': target_grid}, show_progress_bars=False, log_path=os.path.join(self.outputpath, '000_log'), + log_filename=self.log_filename, loglevel="INFO", ignore_errors=True, n_proc=self.n_proc, diff --git a/src/repurpose/process.py b/src/repurpose/process.py index 97fff06..a2ded01 100644 --- a/src/repurpose/process.py +++ b/src/repurpose/process.py @@ -61,36 +61,47 @@ def grid(self): def tstamps_for_daterange(self, *args, **kwargs): return self.reader.tstamps_for_daterange(*args, **kwargs) - def _gen_filelist(self): - return glob(os.path.join(self.reader.path, '**'), recursive=True) + def _gen_filelist(self) -> list: + flist = glob(os.path.join(self.reader.path, '**'), recursive=True) + return flist def read(self, timestamp, **kwargs): - retries = 0 + retry = 0 img = None error = None - while img is None and retries <= self.max_retries: - filename = None + filename = None + + while (img is None) and (retry <= self.max_retries): try: - filename = self.reader._build_filename(timestamp) + if filename is None: + filename = self.reader._build_filename(timestamp) img = self.reader.read(timestamp, **kwargs) + # except IOError as e: + # error = e + # break except Exception as e: + logging.error(f"Error reading file (try {retry+1}) " + f"at {timestamp}: {e}. " + f"Trying again.") if filename is not None: if filename not in self.filelist: + logging.error( + f"File at {timestamp} does not exist.") break - else: - img = None - error = e - time.sleep(self.retry_delay_s) + # else: + img = None + error = e + time.sleep(self.retry_delay_s) - retries += 1 + retry += 1 if img is None: - raise IOError(f"Reading file {filename} failed even after " - f"{retries} retries: {error}") + logging.error(f"Reading file at {timestamp} failed after " + f"{retry} retries: {error}") else: - logging.info(f"Success reading {filename} after {retries} " - f"retries") - return img + logging.info(f"Success reading {filename} after {retry} " + f"tries.") + return img def rootdir() -> Path: @@ -125,6 +136,7 @@ def parallel_process_async( ignore_errors=False, activate_logging=True, log_path=None, + log_filename=None, loglevel="WARNING", verbose=False, progress_bar_label="Processed" @@ -163,6 +175,9 @@ def parallel_process_async( If False, no logging is done at all (neither to file nor to stdout). log_path: str, optional (default: None) If provided, a log file is created in the passed directory. + log_filename: str, optional (default: None) + Name of the logfile in `log_path to create. If None is chosen, a name + is created automatically. If `log_path is None, this has no effect. loglevel: str, optional (default: "WARNING") Log level to use for logging. Must be one of ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]. @@ -178,22 +193,23 @@ def parallel_process_async( """ if activate_logging: logger = logging.getLogger() - streamHandler = logging.StreamHandler(sys.stdout) - formatter = logging.Formatter( - '%(asctime)s - %(name)s - %(levelname)s - %(message)s') - streamHandler.setFormatter(formatter) if STATIC_KWARGS is None: STATIC_KWARGS = dict() if verbose: + streamHandler = logging.StreamHandler(sys.stdout) + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s') + streamHandler.setFormatter(formatter) logger.setLevel('DEBUG') logger.addHandler(streamHandler) if log_path is not None: - log_file = os.path.join( - log_path, - f"{FUNC.__name__}_{datetime.now().strftime('%Y%m%d%H%M')}.log") + if log_filename is None: + d = datetime.now().strftime('%Y%m%d%H%M') + log_filename = f"{FUNC.__name__}_{d}.log" + log_file = os.path.join(log_path, log_filename) else: log_file = None @@ -204,6 +220,7 @@ def parallel_process_async( level=loglevel.upper(), format="%(levelname)s %(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", + force=True, ) else: logger = None @@ -280,9 +297,9 @@ def error(e) -> None: logger.handlers.clear() handlers = logger.handlers[:] - handlers.clear() for handler in handlers: logger.removeHandler(handler) handler.close() + handlers.clear() return results diff --git a/tests/test_img2ts.py b/tests/test_img2ts.py index 7d83b3f..64ac2d3 100644 --- a/tests/test_img2ts.py +++ b/tests/test_img2ts.py @@ -39,9 +39,7 @@ from pygeogrids import BasicGrid from pygeogrids.netcdf import load_grid from pynetcf.time_series import OrthoMultiTs, GriddedNcIndexedRaggedTs, GriddedNcOrthoMultiTs -from glob import glob import xarray as xr -import pytest import tempfile import numpy.testing as nptest From 503cc2b890fe3fe142f1004490b25cbaa06d5360 Mon Sep 17 00:00:00 2001 From: Wolfgang Preimesberger Date: Wed, 17 Jan 2024 17:30:26 +0100 Subject: [PATCH 2/2] clean up --- CHANGELOG.rst | 2 +- src/repurpose/process.py | 9 +++------ 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index b99828a..b1906d9 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -5,7 +5,7 @@ Changelog Unreleased changes in master branch =================================== -- +- Added option to parallelize Img2Ts process Version 0.10 ============ diff --git a/src/repurpose/process.py b/src/repurpose/process.py index a2ded01..029e435 100644 --- a/src/repurpose/process.py +++ b/src/repurpose/process.py @@ -36,16 +36,16 @@ class ImageBaseConnection: This protects against processing gaps due to e.g. temporary network issues. """ - def __init__(self, reader, max_retries=20, retry_delay_s=10): + def __init__(self, reader, max_retries=99, retry_delay_s=1): """ Parameters ---------- reader: MultiTemporalImageBase Reader object for which the filelist is created - max_retries: int + max_retries: int, optional (default: 10) Number of retries when a file is in the filelist but reading fails. - retry_delay_s: int + retry_delay_s: int, optional (default: 1) Number of seconds to wait after each failed retry. """ self.reader = reader @@ -76,9 +76,6 @@ def read(self, timestamp, **kwargs): if filename is None: filename = self.reader._build_filename(timestamp) img = self.reader.read(timestamp, **kwargs) - # except IOError as e: - # error = e - # break except Exception as e: logging.error(f"Error reading file (try {retry+1}) " f"at {timestamp}: {e}. "