Skip to content

Commit

Permalink
Use a dataclass to specify cores and memory
Browse files Browse the repository at this point in the history
This gives more flexibility for the future and makes it easier
for this information to be added to ctrl_mpexec APIs.
  • Loading branch information
timj committed Jun 21, 2023
1 parent cf0b2db commit be91805
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 26 deletions.
52 changes: 35 additions & 17 deletions python/lsst/pipe/base/_quantumContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
"""Module defining a butler like object specialized to a specific quantum.
"""

__all__ = ("ButlerQuantumContext", "QuantumContext")
__all__ = ("ButlerQuantumContext", "ExecutionResources", "QuantumContext")

import numbers
from collections.abc import Sequence
from dataclasses import dataclass
from typing import Any

import astropy.units as u
Expand All @@ -41,6 +43,30 @@
_LOG = getLogger(__name__)


@dataclass(frozen=True, kw_only=True)
class ExecutionResources:
"""A description of the resources available to a running quantum."""

num_cores: int = 1
"""The maximum number of cores that the task can use."""

max_mem: u.Quantity | numbers.Real | None = None
"""If defined, the amount of memory allocated to the task. If a plain
integer is given it is assumed to be the number of bytes and converted
to a `~astropy.units.Quantity`.
"""

def __post_init__(self) -> None:
# Normalize max_mem to bytes.
max_mem = self.max_mem
if max_mem is not None:
if isinstance(max_mem, numbers.Real):
max_mem *= u.B
else:
max_mem = max_mem.to(u.B)
object.__setattr__(self, "max_mem", max_mem)


class QuantumContext:
"""A Butler-like class specialized for a single quantum along with
context information that can influence how the task is executed.
Expand All @@ -52,11 +78,8 @@ class QuantumContext:
quantum : `lsst.daf.butler.core.Quantum`
Quantum object that describes the datasets which will be get/put by a
single execution of this node in the pipeline graph.
num_cores : `int`, optional
The maximum number of cores that the task can use.
max_mem : `astropy.units.Quantity`, `int`, or `None`, optional
If defined, the amount of memory allocated to the task. If a plain
integer is given it is assumed to be the number of bytes.
resources : `ExecutionResources`
The resources allocated for executing quanta.
Notes
-----
Expand All @@ -71,20 +94,15 @@ class QuantumContext:
execution.
"""

resources: ExecutionResources

def __init__(
self, butler: LimitedButler, quantum: Quantum, num_cores: int = 1, max_mem: u.Quantity | None = None
self, butler: LimitedButler, quantum: Quantum, *, resources: ExecutionResources | None = None
):
self.quantum = quantum
self.num_cores = num_cores

# Internally store as bytes. This will also ensure that the quantity
# given is convertible to bytes.
if max_mem is not None:
if isinstance(max_mem, int):
max_mem *= u.B
else:
max_mem = max_mem.to(u.B)
self.max_mem = max_mem
if resources is None:
resources = ExecutionResources()
self.resources = resources

self.allInputs = set()
self.allOutputs = set()
Expand Down
20 changes: 11 additions & 9 deletions tests/test_pipelineTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ def testButlerQC(self):
butler.put(100, ref)

butlerQC = pipeBase.QuantumContext(butler, quantum)
self.assertEqual(butlerQC.num_cores, 1)
self.assertIsNone(butlerQC.max_mem)
self.assertEqual(butlerQC.resources.num_cores, 1)
self.assertIsNone(butlerQC.resources.max_mem)

# Pass ref as single argument or a list.
obj = butlerQC.get(ref)
Expand Down Expand Up @@ -310,16 +310,18 @@ def testButlerQC(self):
self.assertEqual(obj, {"input": [None, 100], "input2": None})

# Set additional context.
butlerQC = pipeBase.QuantumContext(butler, quantum, num_cores=4, max_mem=5 * u.MB)
self.assertEqual(butlerQC.num_cores, 4)
self.assertEqual(butlerQC.max_mem, 5_000_000 * u.B)
resources = pipeBase.ExecutionResources(num_cores=4, max_mem=5 * u.MB)
butlerQC = pipeBase.QuantumContext(butler, quantum, resources=resources)
self.assertEqual(butlerQC.resources.num_cores, 4)
self.assertEqual(butlerQC.resources.max_mem, 5_000_000 * u.B)

butlerQC = pipeBase.QuantumContext(butler, quantum, max_mem=5)
self.assertEqual(butlerQC.num_cores, 1)
self.assertEqual(butlerQC.max_mem, 5 * u.B)
resources = pipeBase.ExecutionResources(max_mem=5)
butlerQC = pipeBase.QuantumContext(butler, quantum, resources=resources)
self.assertEqual(butlerQC.resources.num_cores, 1)
self.assertEqual(butlerQC.resources.max_mem, 5 * u.B)

with self.assertRaises(u.UnitConversionError):
pipeBase.QuantumContext(butler, quantum, max_mem=1 * u.m)
pipeBase.ExecutionResources(max_mem=1 * u.m)


class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):
Expand Down

0 comments on commit be91805

Please sign in to comment.