From ff36901ef9c2a3c74de310e28faa059a9f6b296e Mon Sep 17 00:00:00 2001 From: YuviPanda Date: Sat, 25 Feb 2023 19:49:53 -0800 Subject: [PATCH] Support setting direct_running_mode for LocalDirectRunner This also changes the default local one from multi_processing to multi_threading. The primary use here is to make the logging config from the *runner* be inherited by the *workers* too, which helps with debugging quite a bit. --- pangeo_forge_runner/bakery/local.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/pangeo_forge_runner/bakery/local.py b/pangeo_forge_runner/bakery/local.py index 55ffeeb..709368e 100644 --- a/pangeo_forge_runner/bakery/local.py +++ b/pangeo_forge_runner/bakery/local.py @@ -3,7 +3,7 @@ """ from apache_beam.pipeline import PipelineOptions -from traitlets import Integer +from traitlets import Integer, Unicode from .base import Bakery @@ -29,6 +29,24 @@ class LocalDirectBakery(Bakery): """, ) + direct_running_mode = Unicode( + "multi_threading", + config=True, + help=""" + One of 'in_memory', 'multi_threading', 'multi_processing'. + + in_memory: Runner and workers’ communication happens in memory (not through gRPC). This is a default mode. + multi_threading: Runner and workers communicate through gRPC and each worker runs in a thread. + multi_processing: Runner and workers communicate through gRPC and each worker runs in a subprocess. + + multi_processing is closest to most production runners, as it enables real usage of multiple + CPUs on the host machine. **However**, it can mess up logging, so is not the default here. + + https://beam.apache.org/documentation/runners/direct/#setting-parallelism has more + information. + """, + ) + def get_pipeline_options( self, job_name: str, container_image: str, extra_options: dict ) -> PipelineOptions: @@ -40,7 +58,7 @@ def get_pipeline_options( return PipelineOptions( flags=[], runner="DirectRunner", - direct_running_mode="multi_processing", + direct_running_mode=self.direct_running_mode, direct_num_workers=self.num_workers, save_main_session=True, # this might solve serialization issues; cf. https://beam.apache.org/blog/beam-2.36.0/