Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-45894: Fix dimension universe instantiation in subprocesses #304

Merged
merged 4 commits into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ repos:
- id: trailing-whitespace
- id: check-toml
- repo: https://github.com/psf/black-pre-commit-mirror
rev: 24.4.2
rev: 24.8.0
hooks:
- id: black
# It is recommended to specify the latest version of Python
Expand All @@ -22,10 +22,10 @@ repos:
name: isort (python)
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.5.1
rev: v0.6.2
hooks:
- id: ruff
- repo: https://github.com/numpy/numpydoc
rev: "v1.7.0"
rev: "v1.8.0"
hooks:
- id: numpydoc-validation
1 change: 1 addition & 0 deletions doc/changes/DM-45894.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix for the bug in `pipetask run-qbb -j` which crashed if database dimension version is different from default version.
34 changes: 33 additions & 1 deletion python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import contextlib
import copy
import logging
import pickle
import shutil
from collections.abc import Mapping, Sequence
from types import SimpleNamespace
Expand All @@ -49,6 +50,7 @@
Config,
DatasetId,
DatasetType,
DimensionConfig,
DimensionUniverse,
LimitedButler,
Quantum,
Expand Down Expand Up @@ -500,7 +502,14 @@


class _QBBFactory:
"""Class which is a callable for making QBB instances."""
"""Class which is a callable for making QBB instances.

This class is also responsible for reconstructing correct dimension
universe after unpickling. When pickling multiple things that require
dimension universe, this class must be unpickled first. The logic in
MPGraphExecutor ensures that SingleQuantumExecutor is unpickled first in
the subprocess, which causes unpickling of this class.
"""

def __init__(
self, butler_config: Config, dimensions: DimensionUniverse, dataset_types: Mapping[str, DatasetType]
Expand All @@ -521,6 +530,29 @@
dataset_types=self.dataset_types,
)

@classmethod
def _unpickle(
cls, butler_config: Config, dimensions_config: DimensionConfig | None, dataset_types_pickle: bytes
) -> _QBBFactory:
universe = DimensionUniverse(dimensions_config)
dataset_types = pickle.loads(dataset_types_pickle)
return _QBBFactory(butler_config, universe, dataset_types)

Check warning on line 539 in python/lsst/ctrl/mpexec/cmdLineFwk.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/cmdLineFwk.py#L537-L539

Added lines #L537 - L539 were not covered by tests

def __reduce__(self) -> tuple:
# If dimension universe is not default one, we need to dump/restore
# its config.
config = self.dimensions.dimensionConfig
default = DimensionConfig()

Check warning on line 545 in python/lsst/ctrl/mpexec/cmdLineFwk.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/cmdLineFwk.py#L544-L545

Added lines #L544 - L545 were not covered by tests
# Only send configuration to other side if it is non-default, default
# will be instantiated from config=None.
if (config["namespace"], config["version"]) != (default["namespace"], default["version"]):
andy-slac marked this conversation as resolved.
Show resolved Hide resolved
dimension_config = config

Check warning on line 549 in python/lsst/ctrl/mpexec/cmdLineFwk.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/cmdLineFwk.py#L549

Added line #L549 was not covered by tests
else:
dimension_config = None

Check warning on line 551 in python/lsst/ctrl/mpexec/cmdLineFwk.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/cmdLineFwk.py#L551

Added line #L551 was not covered by tests
# Dataset types need to be unpickled only after universe is made.
dataset_types_pickle = pickle.dumps(self.dataset_types)
return (self._unpickle, (self.butler_config, dimension_config, dataset_types_pickle))

Check warning on line 554 in python/lsst/ctrl/mpexec/cmdLineFwk.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/cmdLineFwk.py#L553-L554

Added lines #L553 - L554 were not covered by tests


# ------------------------
# Exported definitions --
Expand Down
28 changes: 18 additions & 10 deletions python/lsst/ctrl/mpexec/mpGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,19 @@ def start(
startMethod : `str`, optional
Start method from `multiprocessing` module.
"""
# Unpickling of quantum has to happen after butler/executor, this is
# why it is pickled manually here.
# Unpickling of quantum has to happen after butler/executor, also we
# want to setup logging before unpickling anything that can generate
# messages, this is why things are pickled manually here.
qe_pickle = pickle.dumps(quantumExecutor)
task_node_pickle = pickle.dumps(self.qnode.task_node)
quantum_pickle = pickle.dumps(self.qnode.quantum)
task_node = self.qnode.task_node
self._rcv_conn, snd_conn = multiprocessing.Pipe(False)
logConfigState = CliLog.configState

mp_ctx = multiprocessing.get_context(startMethod)
self.process = mp_ctx.Process( # type: ignore[attr-defined]
target=_Job._executeJob,
args=(quantumExecutor, task_node, quantum_pickle, logConfigState, snd_conn, self._fail_fast),
args=(qe_pickle, task_node_pickle, quantum_pickle, logConfigState, snd_conn, self._fail_fast),
name=f"task-{self.qnode.quantum.dataId}",
)
# mypy is getting confused by multiprocessing.
Expand All @@ -134,8 +136,8 @@ def start(

@staticmethod
def _executeJob(
quantumExecutor: QuantumExecutor,
task_node: TaskNode,
quantumExecutor_pickle: bytes,
task_node_pickle: bytes,
quantum_pickle: bytes,
logConfigState: list,
snd_conn: multiprocessing.connection.Connection,
Expand All @@ -145,14 +147,18 @@ def _executeJob(

Parameters
----------
quantumExecutor : `QuantumExecutor`
Executor for single quantum.
task_node : `lsst.pipe.base.pipeline_graph.TaskNode`
Task definition structure.
quantumExecutor_pickle : `bytes`
Executor for single quantum, pickled.
task_node_pickle : `bytes`
Task definition structure, pickled.
quantum_pickle : `bytes`
Quantum for this task execution in pickled form.
logConfigState : `list`
Logging state from parent process.
snd_conn : `multiprocessing.Connection`
Connection to send job report to parent process.
fail_fast : `bool`
If `True` then kill subprocess on RepeatableQuantumError.
"""
# This terrible hack is a workaround for Python threading bug:
# https://github.com/python/cpython/issues/102512. Should be removed
Expand All @@ -168,6 +174,8 @@ def _executeJob(
# re-initialize logging
CliLog.replayConfigState(logConfigState)

quantumExecutor: QuantumExecutor = pickle.loads(quantumExecutor_pickle)
task_node: TaskNode = pickle.loads(task_node_pickle)
quantum = pickle.loads(quantum_pickle)
report: QuantumReport | None = None
# Catch a few known failure modes and stop the process immediately,
Expand Down
Loading