Skip to content

Commit

Permalink
Add number of cores and max memory to QuantumContext
Browse files Browse the repository at this point in the history
  • Loading branch information
timj committed Jun 15, 2023
1 parent 193d725 commit 9733e75
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 2 deletions.
24 changes: 22 additions & 2 deletions python/lsst/pipe/base/_quantumContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from collections.abc import Sequence
from typing import Any

import astropy.units as u
from deprecated.sphinx import deprecated
from lsst.daf.butler import DatasetRef, DimensionUniverse, LimitedButler, Quantum
from lsst.utils.introspection import get_full_type_name
Expand All @@ -41,7 +42,8 @@


class QuantumContext:
"""A Butler-like class specialized for a single quantum.
"""A Butler-like class specialized for a single quantum along with
context information that can influence how the task is executed.
Parameters
----------
Expand All @@ -50,6 +52,11 @@ class QuantumContext:
quantum : `lsst.daf.butler.core.Quantum`
Quantum object that describes the datasets which will be get/put by a
single execution of this node in the pipeline graph.
num_cores : `int`, optional
The maximum number of cores that the task can use.
max_mem : `astropy.units.Quantity`, `int`, or `None`, optional
If defined, the amount of memory allocated to the task. If a plain
integer is given it is assumed to be the number of bytes.
Notes
-----
Expand All @@ -64,8 +71,21 @@ class QuantumContext:
execution.
"""

def __init__(self, butler: LimitedButler, quantum: Quantum):
def __init__(
self, butler: LimitedButler, quantum: Quantum, num_cores: int = 1, max_mem: u.Quantity | None = None
):
self.quantum = quantum
self.num_cores = num_cores

# Internally store as bytes. This will also ensure that the quantity
# given is convertible to bytes.
if max_mem is not None:
if isinstance(max_mem, int):
max_mem *= u.B
else:
max_mem = max_mem.to(u.B)
self.max_mem = max_mem

self.allInputs = set()
self.allOutputs = set()
for refs in quantum.inputs.values():
Expand Down
15 changes: 15 additions & 0 deletions tests/test_pipelineTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import unittest
from typing import Any

import astropy.units as u
import lsst.pex.config as pexConfig
import lsst.pipe.base as pipeBase
import lsst.utils.logging
Expand Down Expand Up @@ -278,6 +279,8 @@ def testButlerQC(self):
butler.put(100, ref)

butlerQC = pipeBase.QuantumContext(butler, quantum)
self.assertEqual(butlerQC.num_cores, 1)
self.assertIsNone(butlerQC.max_mem)

# Pass ref as single argument or a list.
obj = butlerQC.get(ref)
Expand Down Expand Up @@ -306,6 +309,18 @@ def testButlerQC(self):
obj = butlerQC.get(inputRefs)
self.assertEqual(obj, {"input": [None, 100], "input2": None})

# Set additional context.
butlerQC = pipeBase.QuantumContext(butler, quantum, num_cores=4, max_mem=5 * u.MB)
self.assertEqual(butlerQC.num_cores, 4)
self.assertEqual(butlerQC.max_mem, 5_000_000 * u.B)

butlerQC = pipeBase.QuantumContext(butler, quantum, max_mem=5)
self.assertEqual(butlerQC.num_cores, 1)
self.assertEqual(butlerQC.max_mem, 5 * u.B)

with self.assertRaises(u.UnitConversionError):
pipeBase.QuantumContext(butler, quantum, max_mem=1 * u.m)


class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):
pass
Expand Down

0 comments on commit 9733e75

Please sign in to comment.