Skip to content

Improve performance of show command using an EAR cache #831

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions hpcflow/sdk/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3314,12 +3314,13 @@ def _get_known_submissions(
out_item["deleted"] = True

else:
if status:
status.update(
f"Reading workflow {file_dat_i['path']!r} submission info..."
)
with wk_i._store.cache_ctx():
sub = wk_i.submissions[file_dat_i["sub_idx"]]
if status:
status.update(
f"Loading workflow {file_dat_i['path']!r} run metadata..."
)
sub.use_EARs_cache = True # pre-cache EARs of this submission

all_jobscripts = sub._submission_parts[submit_time_str]
out_item["jobscripts"] = all_jobscripts
Expand Down
13 changes: 8 additions & 5 deletions hpcflow/sdk/persistence/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1218,12 +1218,15 @@ def _reset_cache(self) -> None:
@contextlib.contextmanager
def cache_ctx(self) -> Iterator[None]:
"""Context manager for using the persistent element/iteration/run cache."""
self._use_cache = True
try:
if self._use_cache:
yield
finally:
self._use_cache = False
self._reset_cache()
else:
self._use_cache = True
try:
yield
finally:
self._use_cache = False
self._reset_cache()

@contextlib.contextmanager
def parameters_metadata_cache(self):
Expand Down
3 changes: 3 additions & 0 deletions hpcflow/sdk/submission/jobscript.py
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,7 @@ def at_submit_metadata(self) -> dict[str, Any]:
)

@property
@TimeIt.decorator
def all_EAR_IDs(self) -> NDArray:
"""Return all run IDs of this jobscripts (across all blocks), removing missing
run IDs (i.e. -1 values)"""
Expand All @@ -911,6 +912,8 @@ def all_EARs(self) -> Sequence[ElementActionRun]:
"""
Description of EAR information for this jobscript.
"""
if self.submission._use_EARs_cache:
return [self.submission._EARs_cache[ear_id] for ear_id in self.all_EAR_IDs]
return self.workflow.get_EARs_from_IDs(self.all_EAR_IDs)

@property
Expand Down
89 changes: 72 additions & 17 deletions hpcflow/sdk/submission/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import Any, Literal, overload, TYPE_CHECKING
from typing_extensions import override
import warnings
from contextlib import contextmanager


from hpcflow.sdk.utils.strings import shorten_list_str
Expand Down Expand Up @@ -135,6 +136,10 @@ def __init__(
None # assigned on first access
)

# updated in _submission_EARs_cache context manager:
self._use_EARs_cache = False
self._EARs_cache: dict[int, ElementActionRun] = {}

if workflow:
#: The workflow this is part of.
self.workflow = workflow
Expand Down Expand Up @@ -226,6 +231,8 @@ def _postprocess_to_dict(self, d: dict[str, Any]) -> dict[str, Any]:
del dct["_workflow"]
del dct["_index"]
del dct["_submission_parts_lst"]
del dct["_use_EARs_cache"]
del dct["_EARs_cache"]
return {k.lstrip("_"): v for k, v in dct.items()}

@property
Expand Down Expand Up @@ -265,6 +272,30 @@ def submission_parts(self) -> list[SubmissionPart]:
]
return self._submission_parts_lst

@property
@TimeIt.decorator
def use_EARs_cache(self) -> bool:
"""Whether to pre-cache all EARs associated with the submission."""
return self._use_EARs_cache

@use_EARs_cache.setter
@TimeIt.decorator
def use_EARs_cache(self, value: bool):
"""Toggle the EAR caching facility."""
if self._use_EARs_cache == value:
return
self._use_EARs_cache = value
if value:
all_EAR_IDs = list(self.all_EAR_IDs)
self._EARs_cache = {
ear_ID: ear
for ear_ID, ear in zip(
all_EAR_IDs, self.workflow.get_EARs_from_IDs(all_EAR_IDs)
)
}
else:
self._EARs_cache = {} # reset the cache

@TimeIt.decorator
def get_start_time(self, submit_time: str) -> datetime | None:
"""Get the start time of a given submission part."""
Expand All @@ -283,17 +314,36 @@ def get_end_time(self, submit_time: str) -> datetime | None:
@TimeIt.decorator
def start_time(self) -> datetime | None:
"""Get the first non-None start time over all submission parts."""
times = (
self.get_start_time(submit_time) for submit_time in self._submission_parts
)
return min((t for t in times if t is not None), default=None)
with self.using_EARs_cache():
times = (
self.get_start_time(submit_time) for submit_time in self._submission_parts
)
return min((t for t in times if t is not None), default=None)

@property
@TimeIt.decorator
def end_time(self) -> datetime | None:
"""Get the final non-None end time over all submission parts."""
times = (self.get_end_time(submit_time) for submit_time in self._submission_parts)
return max((t for t in times if t is not None), default=None)
with self.using_EARs_cache():
times = (
self.get_end_time(submit_time) for submit_time in self._submission_parts
)
return max((t for t in times if t is not None), default=None)

@contextmanager
def using_EARs_cache(self):
"""
A context manager to load and cache all EARs associated with this submission (and
its jobscripts).
"""
if self.use_EARs_cache:
yield
else:
self.use_EARs_cache = True
try:
yield
finally:
self.use_EARs_cache = False

@property
def jobscripts(self) -> list[Jobscript]:
Expand Down Expand Up @@ -592,15 +642,18 @@ def all_EAR_IDs(self) -> Iterable[int]:
"""
The IDs of all EARs in this submission.
"""
return (i for js in self.jobscripts for i in js.all_EAR_IDs)
return (int(i) for js in self.jobscripts for i in js.all_EAR_IDs)

@property
@TimeIt.decorator
def all_EARs(self) -> Iterable[ElementActionRun]:
def all_EARs(self) -> list[ElementActionRun]:
"""
All EARs in this submission.
"""
return (ear for js in self.jobscripts for ear in js.all_EARs)
if self.use_EARs_cache:
return list(self._EARs_cache.values())
else:
return self.workflow.get_EARs_from_IDs(self.all_EAR_IDs)

@property
@TimeIt.decorator
Expand All @@ -610,9 +663,10 @@ def all_EARs_IDs_by_jobscript(self) -> list[np.ndarray]:
@property
@TimeIt.decorator
def all_EARs_by_jobscript(self) -> list[list[ElementActionRun]]:
ids = [i.all_EAR_IDs for i in self.jobscripts]
all_EARs = {i.id_: i for i in self.workflow.get_EARs_from_IDs(self.all_EAR_IDs)}
return [[all_EARs[i] for i in js_ids] for js_ids in ids]
all_EARs = {i.id_: i for i in self.all_EARs}
return [
[all_EARs[i] for i in js_ids] for js_ids in self.all_EARs_IDs_by_jobscript
]

@property
@TimeIt.decorator
Expand Down Expand Up @@ -650,11 +704,12 @@ def get_active_jobscripts(
"""Get jobscripts that are active on this machine, and their active states."""
# this returns: {JS_IDX: {BLOCK_IDX: {JS_ELEMENT_IDX: STATE}}}
# TODO: query the scheduler once for all jobscripts?
return {
js.index: act_states
for js in self.jobscripts
if (act_states := js.get_active_states(as_json=as_json))
}
with self.using_EARs_cache():
return {
js.index: act_states
for js in self.jobscripts
if (act_states := js.get_active_states(as_json=as_json))
}

@TimeIt.decorator
def _write_scripts(
Expand Down
Loading