Skip to content

Commit

Permalink
Merge pull request #299 from lsst/tickets/DM-40443
Browse files Browse the repository at this point in the history
DM-40443: Remove interfaces deprecated on DM-40441
  • Loading branch information
timj authored Aug 11, 2024
2 parents 8eedeb4 + e68f505 commit ac6ec8c
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 235 deletions.
1 change: 1 addition & 0 deletions doc/changes/DM-40443.removal.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Removed the deprecated support for ``TaskDef`` in some APIs.
Binary file added python/.DS_Store
Binary file not shown.
31 changes: 7 additions & 24 deletions python/lsst/ctrl/mpexec/log_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,14 @@
import os
import shutil
import tempfile
import warnings
from collections.abc import Iterator
from contextlib import contextmanager, suppress
from logging import FileHandler

from lsst.daf.butler import Butler, FileDataset, LimitedButler, Quantum
from lsst.daf.butler.logging import ButlerLogRecordHandler, ButlerLogRecords, ButlerMDC, JsonLogFormatter
from lsst.pipe.base import InvalidQuantumError, TaskDef
from lsst.pipe.base import InvalidQuantumError
from lsst.pipe.base.pipeline_graph import TaskNode
from lsst.utils.introspection import find_outside_stacklevel

_LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -88,17 +86,13 @@ def from_full(cls, butler: Butler) -> LogCapture:
return cls(butler, butler)

@contextmanager
def capture_logging(
self, task_node: TaskDef | TaskNode, /, quantum: Quantum
) -> Iterator[_LogCaptureFlag]:
def capture_logging(self, task_node: TaskNode, /, quantum: Quantum) -> Iterator[_LogCaptureFlag]:
"""Configure logging system to capture logs for execution of this task.
Parameters
----------
task_node : `lsst.pipe.base.TaskDef` or \
`~lsst.pipe.base.pipeline_graph.TaskNode`
The task definition. Support for `~lsst.pipe.base.TaskDef` is
deprecated and will be removed after v27.
task_node : `~lsst.pipe.base.pipeline_graph.TaskNode`
The task definition.
quantum : `~lsst.daf.butler.Quantum`
Single Quantum instance.
Expand All @@ -124,20 +118,9 @@ def capture_logging(
if self.full_butler is not None:
mdc["RUN"] = self.full_butler.run or ""
ctx = _LogCaptureFlag()

if isinstance(task_node, TaskDef):
# TODO: remove this block and associated docs and annotations on
# DM-40443.
log_dataset_name = task_node.logOutputDatasetName
warnings.warn(
"Passing TaskDef instances to LogCapture is deprecated and will not be supported after v27.",
FutureWarning,
find_outside_stacklevel("lsst.ctrl.mpexec"),
)
else:
log_dataset_name = (
task_node.log_output.dataset_type_name if task_node.log_output is not None else None
)
log_dataset_name = (
task_node.log_output.dataset_type_name if task_node.log_output is not None else None
)

# Add a handler to the root logger to capture execution log output.
if log_dataset_name is not None:
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/mpGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def _executeJob(
quantum = pickle.loads(quantum_pickle)
report: QuantumReport | None = None
try:
_, report = quantumExecutor.execute(task_node, quantum)
quantum, report = quantumExecutor.execute(task_node, quantum)
except Exception as exc:
_LOG.debug("exception from task %s dataId %s: %s", task_node.label, quantum.dataId, exc)
report = QuantumReport.from_exception(
Expand Down
12 changes: 4 additions & 8 deletions python/lsst/ctrl/mpexec/quantumGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

if TYPE_CHECKING:
from lsst.daf.butler import Quantum
from lsst.pipe.base import QuantumGraph, TaskDef
from lsst.pipe.base import QuantumGraph
from lsst.pipe.base.pipeline_graph import TaskNode


Expand All @@ -50,17 +50,13 @@ class QuantumExecutor(ABC):
"""

@abstractmethod
def execute(
self, task_node: TaskNode | TaskDef, /, quantum: Quantum
) -> tuple[Quantum, QuantumReport | None]:
def execute(self, task_node: TaskNode, /, quantum: Quantum) -> tuple[Quantum, QuantumReport | None]:
"""Execute single quantum.
Parameters
----------
task_node : `~lsst.pipe.base.TaskDef` or \
`~lsst.pipe.base.pipeline_graph.TaskNode`
Task definition structure. `~lsst.pipe.base.TaskDef` support is
deprecated and will be removed after v27.
task_node : `~lsst.pipe.base.pipeline_graph.TaskNode`
Task definition structure.
quantum : `~lsst.daf.butler.Quantum`
Quantum for this execution.
Expand Down
71 changes: 15 additions & 56 deletions python/lsst/ctrl/mpexec/separablePipelineExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,14 @@
import datetime
import getpass
import logging
import warnings
from collections.abc import Iterable, Mapping
from typing import Any, Protocol
from collections.abc import Iterable
from typing import Any

import lsst.pipe.base
import lsst.resources
from lsst.daf.butler import Butler
from lsst.pipe.base.all_dimensions_quantum_graph_builder import (
AllDimensionsQuantumGraphBuilder,
DatasetQueryConstraintVariant,
)
from lsst.pipe.base.all_dimensions_quantum_graph_builder import AllDimensionsQuantumGraphBuilder
from lsst.pipe.base.quantum_graph_builder import QuantumGraphBuilder
from lsst.utils.introspection import find_outside_stacklevel

from .mpGraphExecutor import MPGraphExecutor
from .preExecInit import PreExecInit
Expand All @@ -59,20 +54,6 @@
_LOG = logging.getLogger(__name__)


class _GraphBuilderLike(Protocol):
def makeGraph(
self,
pipeline: lsst.pipe.base.Pipeline | Iterable[lsst.pipe.base.pipeline.TaskDef],
collections: Any,
run: str,
userQuery: str | None,
datasetQueryConstraint: DatasetQueryConstraintVariant = DatasetQueryConstraintVariant.ALL,
metadata: Mapping[str, Any] | None = None,
bind: Mapping[str, Any] | None = None,
) -> lsst.pipe.base.QuantumGraph:
pass


class SeparablePipelineExecutor:
"""An executor that allows each step of pipeline execution to be
run independently.
Expand Down Expand Up @@ -182,7 +163,6 @@ def make_quantum_graph(
self,
pipeline: lsst.pipe.base.Pipeline,
where: str = "",
builder: _GraphBuilderLike | None = None,
*,
builder_class: type[QuantumGraphBuilder] = AllDimensionsQuantumGraphBuilder,
attach_datastore_records: bool = False,
Expand All @@ -198,11 +178,6 @@ def make_quantum_graph(
A data ID query that constrains the quanta generated. Must not be
provided if a custom ``builder_class`` is given and that class does
not accept ``where`` as a construction argument.
builder : `lsst.pipe.base.GraphBuilder`-like, optional
A graph builder that implements a
`~lsst.pipe.base.GraphBuilder.makeGraph` method. By default, a new
instance of `lsst.pipe.base.GraphBuilder` is used. Deprecated in
favor of ``builder_class`` and will be removed after v27.
builder_class : `type` [ \
`lsst.pipe.base.quantum_graph_builder.QuantumGraphBuilder` ], \
optional
Expand Down Expand Up @@ -241,34 +216,18 @@ class are provided automatically (from explicit arguments to this
"user": getpass.getuser(),
"time": str(datetime.datetime.now()),
}
if builder:
warnings.warn(
"The 'builder' argument to SeparablePipelineBuilder.make_quantum_graph "
"is deprecated in favor of 'builder_class', and will be removed after v27.",
FutureWarning,
find_outside_stacklevel("lsst.ctrl.mpexec"),
)
assert self._butler.run is not None, "Butler output run collection must be defined"
graph = builder.makeGraph(
pipeline,
self._butler.collections,
self._butler.run,
userQuery=where,
metadata=metadata,
)
else:
if where:
# Only pass 'where' if it's actually provided, since some
# QuantumGraphBuilder subclasses may not accept it.
kwargs["where"] = where
qg_builder = builder_class(
pipeline.to_graph(),
self._butler,
skip_existing_in=self._skip_existing_in,
clobber=self._clobber_output,
**kwargs,
)
graph = qg_builder.build(metadata=metadata, attach_datastore_records=attach_datastore_records)
if where:
# Only pass 'where' if it's actually provided, since some
# QuantumGraphBuilder subclasses may not accept it.
kwargs["where"] = where
qg_builder = builder_class(
pipeline.to_graph(),
self._butler,
skip_existing_in=self._skip_existing_in,
clobber=self._clobber_output,
**kwargs,
)
graph = qg_builder.build(metadata=metadata, attach_datastore_records=attach_datastore_records)
_LOG.info(
"QuantumGraph contains %d quanta for %d tasks, graph ID: %r",
len(graph),
Expand Down
34 changes: 4 additions & 30 deletions python/lsst/ctrl/mpexec/simple_pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,13 @@

__all__ = ("SimplePipelineExecutor",)

import warnings
from collections.abc import Iterable, Iterator, Mapping
from typing import Any

from lsst.daf.butler import Butler, CollectionType, Quantum
from lsst.pex.config import Config
from lsst.pipe.base import (
ExecutionResources,
Instrument,
Pipeline,
PipelineGraph,
PipelineTask,
QuantumGraph,
TaskDef,
)
from lsst.pipe.base import ExecutionResources, Instrument, Pipeline, PipelineGraph, PipelineTask, QuantumGraph
from lsst.pipe.base.all_dimensions_quantum_graph_builder import AllDimensionsQuantumGraphBuilder
from lsst.utils.introspection import find_outside_stacklevel

from .preExecInit import PreExecInit
from .singleQuantumExecutor import SingleQuantumExecutor
Expand Down Expand Up @@ -241,7 +231,7 @@ def from_task_class(
@classmethod
def from_pipeline(
cls,
pipeline: Pipeline | Iterable[TaskDef],
pipeline: Pipeline,
*,
where: str = "",
bind: Mapping[str, Any] | None = None,
Expand All @@ -256,8 +246,7 @@ def from_pipeline(
pipeline : `~lsst.pipe.base.Pipeline` or \
`~collections.abc.Iterable` [ `~lsst.pipe.base.TaskDef` ]
A Python object describing the tasks to run, along with their
labels and configuration. Passing `~lsst.pipe.base.TaskDef`
objects is deprecated and will not be supported after v27.
labels and configuration.
where : `str`, optional
Data ID query expression that constraints the quanta generated.
bind : `~collections.abc.Mapping`, optional
Expand All @@ -276,22 +265,7 @@ def from_pipeline(
`~lsst.pipe.base.QuantumGraph` and `~lsst.daf.butler.Butler`,
ready for `run` to be called.
"""
if isinstance(pipeline, Pipeline):
pipeline_graph = pipeline.to_graph()
else:
# TODO: disable this block and adjust docs and annotations
# on DM-40443.
warnings.warn(
"Passing TaskDefs to SimplePipelineExecutor.from_pipeline is deprecated "
"and will be removed after v27.",
category=FutureWarning,
stacklevel=find_outside_stacklevel("lsst.ctrl.mpexec"),
)
pipeline_graph = PipelineGraph()
for task_def in pipeline:
pipeline_graph.add_task(
task_def.label, task_def.taskClass, task_def.config, connections=task_def.connections
)
pipeline_graph = pipeline.to_graph()
return cls.from_pipeline_graph(
pipeline_graph, where=where, bind=bind, butler=butler, resources=resources
)
Expand Down
Loading

0 comments on commit ac6ec8c

Please sign in to comment.