Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Flow.metadata attribute and Flow.update_metadata method #679

Merged
merged 11 commits into from
Nov 22, 2024
44 changes: 41 additions & 3 deletions src/jobflow/core/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ def __init__(
order: JobOrder = JobOrder.AUTO,
uuid: str = None,
hosts: list[str] = None,
metadata: dict[str, Any] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please can you add docstrings for these options in the class docstring? E.g. see the corresponding docs for Job.

metadata_updates: list[dict[str, Any]] = None,
):
from jobflow.core.job import Job

Expand All @@ -141,6 +143,8 @@ def __init__(
self.order = order
self.uuid = uuid
self.hosts = hosts or []
self.metadata = metadata or {}
self.metadata_updates = metadata_updates or []

self._jobs: tuple[Flow | Job, ...] = ()
self.add_jobs(jobs)
Expand Down Expand Up @@ -608,9 +612,10 @@ def update_metadata(
function_filter: Callable = None,
dict_mod: bool = False,
dynamic: bool = True,
callback_filter: Callable[[Flow | Job], bool] = lambda _: True,
):
"""
Update the metadata of all Jobs in the Flow.
Update the metadata of the Flow and/or its Jobs.

Note that updates will be applied to jobs in nested Flow.

Expand All @@ -630,6 +635,10 @@ def update_metadata(
dynamic
The updates will be propagated to Jobs/Flows dynamically generated at
runtime.
callback_filter
A function that takes a Flow or Job instance and returns True if updates
should be applied to that instance. Allows for custom filtering logic.
Applies recursively to nested Flows and Jobs so best be specific.

Examples
--------
Expand All @@ -646,16 +655,45 @@ def update_metadata(
The ``metadata`` of both jobs could be updated as follows:

>>> flow.update_metadata({"tag": "addition_job"})

Or using a callback filter to only update flows containing a specific maker:

>>> flow.update_metadata(
... {"material_id": 42},
... callback_filter=lambda flow: SomeMaker in map(type, flow)
... and flow.name == "flow name"
... )
"""
for job in self:
job.update_metadata(
from jobflow.utils.dict_mods import apply_mod

for job_or_flow in self:
job_or_flow.update_metadata(
update,
name_filter=name_filter,
function_filter=function_filter,
dict_mod=dict_mod,
dynamic=dynamic,
callback_filter=callback_filter,
)

if callback_filter(self) is False:
return

if dict_mod:
apply_mod(update, self.metadata)
else:
self.metadata.update(update)

if dynamic:
dict_input = {
"update": update,
"name_filter": name_filter,
"function_filter": function_filter,
"dict_mod": dict_mod,
"callback_filter": callback_filter,
}
self.metadata_updates.append(dict_input)

def update_config(
self,
config: jobflow.JobConfig | dict,
Expand Down
25 changes: 18 additions & 7 deletions src/jobflow/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@ def __init__(
function_args = () if function_args is None else function_args
function_kwargs = {} if function_kwargs is None else function_kwargs
uuid = suid() if uuid is None else uuid
metadata = {} if metadata is None else metadata
config = JobConfig() if config is None else config

# make a deep copy of the function (means makers do not share the same instance)
Expand All @@ -354,7 +353,7 @@ def __init__(
self.uuid = uuid
self.index = index
self.name = name
self.metadata = metadata
self.metadata = metadata or {}
self.config = config
self.hosts = hosts or []
self.metadata_updates = metadata_updates or []
Expand Down Expand Up @@ -927,6 +926,7 @@ def update_metadata(
function_filter: Callable = None,
dict_mod: bool = False,
dynamic: bool = True,
callback_filter: Callable[[jobflow.Flow | Job], bool] = lambda _: True,
):
"""
Update the metadata of the job.
Expand All @@ -950,6 +950,9 @@ def update_metadata(
dynamic
The updates will be propagated to Jobs/Flows dynamically generated at
runtime.
callback_filter
A function that takes a Flow or Job instance and returns True if updates
should be applied to that instance. Allows for custom filtering logic.

Examples
--------
Expand All @@ -968,11 +971,16 @@ def update_metadata(
will not only set the `example` metadata to the `test_job`, but also to all the
new Jobs that will be generated at runtime by the ExampleMaker.

`update_metadata` can be called multiple times with different `name_filter` or
`function_filter` to control which Jobs will be updated.
`update_metadata` can be called multiple times with different filters to control
which Jobs will be updated. For example, using a callback filter:

>>> test_job.update_metadata(
... {"material_id": 42},
... callback_filter=lambda job: isinstance(job.maker, SomeMaker)
... )

At variance, if `dynamic` is set to `False` the `example` metadata will only be
added to the `test_job` and not to the generated Jobs.
At variance, if `dynamic` is set to `False` the metadata will only be
added to the filtered Jobs and not to any generated Jobs.
"""
from jobflow.utils.dict_mods import apply_mod

Expand All @@ -982,14 +990,14 @@ def update_metadata(
"name_filter": name_filter,
"function_filter": function_filter,
"dict_mod": dict_mod,
"callback_filter": callback_filter,
}
self.metadata_updates.append(dict_input)

# unwrap the functions in case the job is a decorated one
function_filter = getattr(function_filter, "__wrapped__", function_filter)
function = getattr(self.function, "__wrapped__", self.function)

# if function_filter is not None and function_filter != self.function:
if function_filter is not None and function_filter != function:
return

Expand All @@ -998,6 +1006,9 @@ def update_metadata(
):
return

if callback_filter(self) is False:
return

# if we get to here then we pass all the filters
if dict_mod:
apply_mod(update, self.metadata)
Expand Down
2 changes: 1 addition & 1 deletion src/jobflow/managers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def run_locally(
Raise an error if the flow was not executed successfully.
allow_external_references : bool
If False all the references to other outputs should be from other Jobs
of the Flow.
of the same Flow.
raise_immediately : bool
If True, raise an exception immediately if a job fails. If False, continue
running the flow and only raise an exception at the end if the flow did not
Expand Down
Loading
Loading