From 999d597625d5beacbb98a8da58f560bdc4ccac18 Mon Sep 17 00:00:00 2001 From: Wolfgang Preimesberger Date: Thu, 2 May 2024 22:28:28 +0200 Subject: [PATCH] Update wrapper for parallel processing to handle log files --- src/repurpose/process.py | 66 +++++++++++++++++++++++++--------------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/src/repurpose/process.py b/src/repurpose/process.py index f703193..6473303 100644 --- a/src/repurpose/process.py +++ b/src/repurpose/process.py @@ -15,6 +15,7 @@ # os.environ['MKL_DYNAMIC'] = 'FALSE' # os.environ['OPENBLAS_NUM_THREADS'] = '1' +import traceback import numpy as np from tqdm import tqdm import logging @@ -154,8 +155,8 @@ def print_progress(self): self._pbar.n = self.n_completed_tasks self._pbar.refresh() -def configure_worker_logger(log_queue, log_level): - worker_logger = logging.getLogger('worker') +def configure_worker_logger(log_queue, log_level, name): + worker_logger = logging.getLogger(name) if not worker_logger.hasHandlers(): h = QueueHandler(log_queue) worker_logger.addHandler(h) @@ -163,15 +164,17 @@ def configure_worker_logger(log_queue, log_level): return worker_logger def run_with_error_handling(FUNC, - ignore_errors=False, log_queue=None, log_level="WARNING", - **kwargs) -> Any: + ignore_errors=False, + log_queue=None, + log_level="WARNING", + logger_name=None, + **kwargs) -> Any: if log_queue is not None: - logger = configure_worker_logger(log_queue, log_level) - logger_name = logger.name - kwargs['logger_name'] = logger_name + logger = configure_worker_logger(log_queue, log_level, logger_name) else: - logger = logging.getLogger() + # normal logger + logger = logging.getLogger(logger_name) r = None @@ -179,7 +182,10 @@ def run_with_error_handling(FUNC, r = FUNC(**kwargs) except Exception as e: if ignore_errors: - logger.error(f"Error: {e}") + logger.error(f"The following ERROR was raised in the parallelized " + f"function `{FUNC.__name__}` but was ignored due to " + f"the chosen settings: " + f"{traceback.format_exc()}") else: raise e return r @@ -195,6 +201,7 @@ def parallel_process_async( log_path=None, log_filename=None, loglevel="WARNING", + logger_name=None, verbose=False, progress_bar_label="Processed", backend="loky", @@ -239,8 +246,15 @@ def parallel_process_async( Name of the logfile in `log_path to create. If None is chosen, a name is created automatically. If `log_path is None, this has no effect. loglevel: str, optional (default: "WARNING") - Log level to use for logging. Must be one of + Which level should be logged. Must be one of ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]. + logger_name: str, optional (default: None) + The name to assign to the logger that can be accessed in FUNC to + log to. If not given, then the root logger is used. e.g + ``` + logger = logging.getLogger() + logger.error("Some error message") + ``` verbose: bool, optional (default: False) Print all logging messages to stdout, useful for debugging. progress_bar_label: str, optional (default: "Processed") @@ -258,16 +272,15 @@ def parallel_process_async( values are found. """ if activate_logging: - logger = logging.getLogger() + logger = logging.getLogger(logger_name) + logger.setLevel(loglevel.upper()) if STATIC_KWARGS is None: STATIC_KWARGS = dict() if verbose: + # in this case we also print ALL log messages streamHandler = logging.StreamHandler(sys.stdout) - formatter = logging.Formatter( - '%(asctime)s - %(name)s - %(levelname)s - %(message)s') - streamHandler.setFormatter(formatter) logger.setLevel('DEBUG') logger.addHandler(streamHandler) @@ -280,14 +293,14 @@ def parallel_process_async( log_file = None if log_file: + # in this case the logger should write to file os.makedirs(os.path.dirname(log_file), exist_ok=True) - logging.basicConfig( - filename=str(log_file), - level=loglevel.upper(), - format="%(levelname)s %(asctime)s %(message)s", + filehandler = logging.FileHandler(log_file) + filehandler.setFormatter(logging.Formatter( + "%(levelname)s %(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", - force=True, - ) + )) + logger.addHandler(filehandler) else: logger = None @@ -316,26 +329,28 @@ def parallel_process_async( process_kwargs.append(kws) if n_proc == 1: - logging.info("Processing metadata with {} process.".format(n_proc)) results = [] if show_progress_bars: pbar = tqdm(total=len(process_kwargs), desc=progress_bar_label) else: pbar = None + for kwargs in process_kwargs: - r = run_with_error_handling(FUNC, ignore_errors, **kwargs) + r = run_with_error_handling(FUNC, ignore_errors, + logger_name=logger_name, + **kwargs) if r is not None: results.append(r) if pbar is not None: pbar.update() else: - logging.info(f"Processing metadata with {n_proc} processes.") if logger is not None: + log_level = logger.getEffectiveLevel() m = Manager() q = m.Queue() - listener = QueueListener(q, *logger.handlers) + listener = QueueListener(q, *logger.handlers, + respect_handler_level=True) listener.start() - log_level = logger.getEffectiveLevel() else: q = None log_level = None @@ -355,6 +370,7 @@ def parallel_process_async( FUNC, ignore_errors, log_queue=q, log_level=log_level, + logger_name=logger_name, **kwargs) for kwargs in process_kwargs)