diff --git a/pangeo_forge_runner/bakery/local.py b/pangeo_forge_runner/bakery/local.py index 55ffeeb..709368e 100644 --- a/pangeo_forge_runner/bakery/local.py +++ b/pangeo_forge_runner/bakery/local.py @@ -3,7 +3,7 @@ """ from apache_beam.pipeline import PipelineOptions -from traitlets import Integer +from traitlets import Integer, Unicode from .base import Bakery @@ -29,6 +29,24 @@ class LocalDirectBakery(Bakery): """, ) + direct_running_mode = Unicode( + "multi_threading", + config=True, + help=""" + One of 'in_memory', 'multi_threading', 'multi_processing'. + + in_memory: Runner and workers’ communication happens in memory (not through gRPC). This is a default mode. + multi_threading: Runner and workers communicate through gRPC and each worker runs in a thread. + multi_processing: Runner and workers communicate through gRPC and each worker runs in a subprocess. + + multi_processing is closest to most production runners, as it enables real usage of multiple + CPUs on the host machine. **However**, it can mess up logging, so is not the default here. + + https://beam.apache.org/documentation/runners/direct/#setting-parallelism has more + information. + """, + ) + def get_pipeline_options( self, job_name: str, container_image: str, extra_options: dict ) -> PipelineOptions: @@ -40,7 +58,7 @@ def get_pipeline_options( return PipelineOptions( flags=[], runner="DirectRunner", - direct_running_mode="multi_processing", + direct_running_mode=self.direct_running_mode, direct_num_workers=self.num_workers, save_main_session=True, # this might solve serialization issues; cf. https://beam.apache.org/blog/beam-2.36.0/