diff --git a/repo/spec b/repo/spec index df2a07e3f..506b33936 160000 --- a/repo/spec +++ b/repo/spec @@ -1 +1 @@ -Subproject commit df2a07e3fda634b2eda5785afe67399b61a81173 +Subproject commit 506b33936d89080a683fa8a26837f2a23b23e5e2 diff --git a/src/ocrd/processor/base.py b/src/ocrd/processor/base.py index 7ec77162e..d6348b40e 100644 --- a/src/ocrd/processor/base.py +++ b/src/ocrd/processor/base.py @@ -18,6 +18,8 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union, get_args import sys +import logging +import logging.handlers import inspect import tarfile import io @@ -515,22 +517,31 @@ def process_workspace(self, workspace: Workspace) -> None: if max_workers > 1: executor_cls = ProcessPoolExecutor + log_queue = mp.Queue() + # forward messages from log queue (in subprocesses) to all root handlers + log_listener = logging.handlers.QueueListener(log_queue, *logging.root.handlers, respect_handler_level=True) else: executor_cls = DummyExecutor + log_queue = None + log_listener = None executor = executor_cls( max_workers=max_workers or 1, # only forking method avoids pickling context=mp.get_context('fork'), # share processor instance as global to avoid pickling initializer=_page_worker_set_ctxt, - initargs=(self,), + initargs=(self, log_queue), ) + if max_workers > 1: + log_listener.start() try: self._base_logger.debug("started executor %s with %d workers", str(executor), max_workers or 1) tasks = self.process_workspace_submit_tasks(executor, max_seconds) stats = self.process_workspace_handle_tasks(tasks) finally: executor.shutdown(kill_workers=True, wait=False) + if max_workers > 1: + log_listener.stop() except NotImplementedError: # fall back to deprecated method @@ -1110,13 +1121,16 @@ def zip_input_files(self, require_first=True, mimetype=None, on_error='skip'): objects, and with the METS Server we do not mutate the local processor instance anyway. """ -def _page_worker_set_ctxt(processor): +def _page_worker_set_ctxt(processor, log_queue): """ Overwrites `ocrd.processor.base._page_worker_processor` instance for sharing with subprocesses in ProcessPoolExecutor initializer. """ global _page_worker_processor _page_worker_processor = processor + if log_queue: + # replace all log handlers with just one queue handler + logging.root.handlers = [logging.handlers.QueueHandler(log_queue)] def _page_worker(timeout, *input_files): """