Skip to content

Commit

Permalink
Processor parallel pages: log via QueueHandler in subprocess, QueueLi…
Browse files Browse the repository at this point in the history
…stener in main
  • Loading branch information
bertsky committed Nov 11, 2024
1 parent 5f2f602 commit 0446b82
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 3 deletions.
18 changes: 16 additions & 2 deletions src/ocrd/processor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union, get_args
import sys
import logging
import logging.handlers
import inspect
import tarfile
import io
Expand Down Expand Up @@ -515,22 +517,31 @@ def process_workspace(self, workspace: Workspace) -> None:

if max_workers > 1:
executor_cls = ProcessPoolExecutor
log_queue = mp.Queue()
# forward messages from log queue (in subprocesses) to all root handlers
log_listener = logging.handlers.QueueListener(log_queue, *logging.root.handlers, respect_handler_level=True)
else:
executor_cls = DummyExecutor
log_queue = None
log_listener = None
executor = executor_cls(
max_workers=max_workers or 1,
# only forking method avoids pickling
context=mp.get_context('fork'),
# share processor instance as global to avoid pickling
initializer=_page_worker_set_ctxt,
initargs=(self,),
initargs=(self, log_queue),
)
if max_workers > 1:
log_listener.start()
try:
self._base_logger.debug("started executor %s with %d workers", str(executor), max_workers or 1)
tasks = self.process_workspace_submit_tasks(executor, max_seconds)
stats = self.process_workspace_handle_tasks(tasks)
finally:
executor.shutdown(kill_workers=True, wait=False)
if max_workers > 1:
log_listener.stop()

except NotImplementedError:
# fall back to deprecated method
Expand Down Expand Up @@ -1110,13 +1121,16 @@ def zip_input_files(self, require_first=True, mimetype=None, on_error='skip'):
objects, and with the METS Server we do not mutate the local
processor instance anyway.
"""
def _page_worker_set_ctxt(processor):
def _page_worker_set_ctxt(processor, log_queue):
"""
Overwrites `ocrd.processor.base._page_worker_processor` instance
for sharing with subprocesses in ProcessPoolExecutor initializer.
"""
global _page_worker_processor
_page_worker_processor = processor
if log_queue:
# replace all log handlers with just one queue handler
logging.root.handlers = [logging.handlers.QueueHandler(log_queue)]

def _page_worker(timeout, *input_files):
"""
Expand Down

0 comments on commit 0446b82

Please sign in to comment.