Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support setting direct_running_mode for LocalDirectRunner #65

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions pangeo_forge_runner/bakery/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

from apache_beam.pipeline import PipelineOptions
from traitlets import Integer
from traitlets import Integer, Unicode

from .base import Bakery

Expand All @@ -29,6 +29,24 @@ class LocalDirectBakery(Bakery):
""",
)

direct_running_mode = Unicode(
"multi_threading",
config=True,
help="""
One of 'in_memory', 'multi_threading', 'multi_processing'.

in_memory: Runner and workers’ communication happens in memory (not through gRPC). This is a default mode.
multi_threading: Runner and workers communicate through gRPC and each worker runs in a thread.
multi_processing: Runner and workers communicate through gRPC and each worker runs in a subprocess.

multi_processing is closest to most production runners, as it enables real usage of multiple
CPUs on the host machine. **However**, it can mess up logging, so is not the default here.

https://beam.apache.org/documentation/runners/direct/#setting-parallelism has more
information.
""",
)

def get_pipeline_options(
self, job_name: str, container_image: str, extra_options: dict
) -> PipelineOptions:
Expand All @@ -40,7 +58,7 @@ def get_pipeline_options(
return PipelineOptions(
flags=[],
runner="DirectRunner",
direct_running_mode="multi_processing",
direct_running_mode=self.direct_running_mode,
direct_num_workers=self.num_workers,
save_main_session=True,
# this might solve serialization issues; cf. https://beam.apache.org/blog/beam-2.36.0/
Expand Down
Loading