From bb3417c33934e49c41fe1b2b35f7776139ea22ef Mon Sep 17 00:00:00 2001 From: Fede Raimondo Date: Fri, 25 Oct 2024 09:54:15 +0200 Subject: [PATCH 1/3] Retry lock of DelayedSubmission if a timeout occurs --- joblib_htcondor/backend.py | 33 ++++++++++------- joblib_htcondor/delayed_submission.py | 51 +++++++++++++++++---------- joblib_htcondor/executor.py | 21 +++++++++-- 3 files changed, 73 insertions(+), 32 deletions(-) diff --git a/joblib_htcondor/backend.py b/joblib_htcondor/backend.py index 2732cd7..61fd587 100644 --- a/joblib_htcondor/backend.py +++ b/joblib_htcondor/backend.py @@ -21,6 +21,7 @@ Any, Callable, Optional, + Tuple, Union, ) from uuid import uuid1 @@ -579,7 +580,7 @@ def get_nested_backend(self) -> tuple["ParallelBackendBase", int]: parent_uuid=self._this_batch_name, ), self._n_jobs - def __reduce__(self) -> tuple[callable, tuple]: + def __reduce__(self) -> tuple[Callable, tuple]: return ( _HTCondorBackendFactory.build, ( @@ -816,10 +817,10 @@ def _submit( if self._export_metadata: # add a new task to the metadata - self._backend_meta.task_status.append( + self._backend_meta.task_status.append( # type: ignore _TaskMeta(request_cpus=self._request_cpus) ) - self._backend_meta.n_tasks += 1 + self._backend_meta.n_tasks += 1 # type: ignore # Add to queue so the poller can take care of it self._queued_jobs_list.append( @@ -884,9 +885,14 @@ def _watcher(self) -> None: level=9, msg=f"Dumping pickle file {to_submit}" ) # Dump pickle file - to_submit.delayed_submission.dump( + dumped = to_submit.delayed_submission.dump( to_submit.pickle_fname ) + if not dumped: + # Something went wrong, continue and submit this + # later + logger.debug("Could not dump pickle file.") + continue # Submit job logger.log(level=9, msg=f"Submitting job {to_submit}") to_submit.htcondor_submit_result = self._client.submit( @@ -895,7 +901,7 @@ def _watcher(self) -> None: ) logger.log(level=9, msg="Getting cluster id.") # Set the cluster id - to_submit.cluster_id = ( + to_submit.cluster_id = ( # type: ignore to_submit.htcondor_submit_result.cluster() ) logger.log(level=9, msg="Job submitted.") @@ -904,14 +910,14 @@ def _watcher(self) -> None: level=9, msg="Updating task status timestamp." ) if self._export_metadata: - self._backend_meta.task_status[ + self._backend_meta.task_status[ # type: ignore to_submit.task_id - 1 ].sent_timestamp = datetime.now() logger.log( level=9, msg="Updating task status cluster id." ) - self._backend_meta.task_status[ + self._backend_meta.task_status[ # type: ignore to_submit.task_id - 1 ].cluster_id = to_submit.cluster_id @@ -956,12 +962,15 @@ def _poll_jobs(self) -> tuple[int, bool]: # noqa: C901 ) if out_fname.exists(): logger.log(level=9, msg=f"Job {job_meta} is done.") - done_jobs.append(job_meta) out_fname = job_meta.pickle_fname.with_stem( f"{job_meta.pickle_fname.stem}_out" ) # Load the DelayedSubmission object ds = DelayedSubmission.load(out_fname) + if ds is None: + # Something went wrong, continue and poll later + continue + done_jobs.append(job_meta) result = ds.result() if ds.error(): logger.log( @@ -984,7 +993,7 @@ def _poll_jobs(self) -> tuple[int, bool]: # noqa: C901 self._completed_jobs_list.append(job_meta) if self._export_metadata: # Set the done timestamp from the ds object - self._backend_meta.task_status[ + self._backend_meta.task_status[ # type: ignore job_meta.task_id - 1 ].done_timestamp = ds.done_timestamp() @@ -996,7 +1005,7 @@ def _poll_jobs(self) -> tuple[int, bool]: # noqa: C901 run_fname = job_meta.pickle_fname.with_suffix(".run") update_meta = ( update_meta - or self._backend_meta.task_status[ + or self._backend_meta.task_status[ # type: ignore job_meta.task_id - 1 ].update_run_from_file(run_fname) ) @@ -1014,7 +1023,7 @@ def _poll_jobs(self) -> tuple[int, bool]: # noqa: C901 run_fname = job_meta.pickle_fname.with_suffix(".run") if self._export_metadata: # Update run from file again - self._backend_meta.task_status[ + self._backend_meta.task_status[ # type: ignore job_meta.task_id - 1 ].update_run_from_file(run_fname) run_fname.unlink() @@ -1220,5 +1229,5 @@ def build( export_metadata=export_metadata, ) out._recursion_level = recursion_level - out._parent_uuid = parent_uuid + out._parent_uuid = parent_uuid # type: ignore return out diff --git a/joblib_htcondor/delayed_submission.py b/joblib_htcondor/delayed_submission.py index f9a7776..976fe7f 100644 --- a/joblib_htcondor/delayed_submission.py +++ b/joblib_htcondor/delayed_submission.py @@ -9,9 +9,11 @@ from pathlib import Path from typing import Any, Callable, Optional, Union -from flufl.lock import Lock +from flufl.lock import Lock, TimeOutError # type: ignore from joblib.externals.cloudpickle import cloudpickle # type: ignore +from .logging import logger + __all__ = ["DelayedSubmission"] @@ -34,10 +36,10 @@ def _get_lock(fname: Union[Path, str], *args: Any, **kwargs: Any) -> Lock: Lock object. """ - if isinstance(fname, Path): - lock_fname = Path(fname).with_suffix(".lock") - else: - lock_fname = Path(fname + ".lock") + if not isinstance(fname, Path): + fname = Path(fname) + lock_fname = fname.with_suffix(".lock") + return Lock(lock_fname.as_posix(), *args, **kwargs) @@ -76,7 +78,7 @@ def __init__( def run(self) -> None: """Run the function with the arguments and store the result.""" try: - self._result = self.func(*self.args, **self.kwargs) + self._result = self.func(*self.args, **self.kwargs) # type: ignore except BaseException as e: # noqa: BLE001 self._result = _ExceptionWithTraceback( e, @@ -133,7 +135,7 @@ def result(self) -> Any: def dump( self, filename: Union[str, Path], result_only: bool = False - ) -> None: + ) -> bool: """Dump the object to a file. Parameters @@ -155,19 +157,25 @@ def dump( # Get lockfile flock = _get_lock(fname=filename, lifetime=120) # Max 2 minutes # Dump in the lockfile - with flock: - with open(filename, "wb") as file: - cloudpickle.dump(self, file) + try: + with flock: + with open(filename, "wb") as file: + cloudpickle.dump(self, file) + except TimeOutError: + logger.error(f"Could not obtain lock for {filename} in 2 minutes.") + return False # Set to original values if result_only: self.func = tmp_func self.args = tmp_args self.kwargs = tmp_kwargs + return True + @classmethod def load( cls: type["DelayedSubmission"], filename: Union[str, Path] - ) -> "DelayedSubmission": + ) -> Optional["DelayedSubmission"]: """Load a DelayedSubmission object from a file. Parameters @@ -177,8 +185,9 @@ def load( Returns ------- - DelayedSubmission - The loaded DelayedSubmission object. + DelayedSubmission or None + The loaded DelayedSubmission object. If a TimeOutError is raised + while obtaining the lock, returns None. Raises ------ @@ -189,9 +198,15 @@ def load( # Get lockfile flock = _get_lock(filename, lifetime=120) # Max 2 minutes # Load from the lockfile - with flock: - with open(filename, "rb") as file: - obj = cloudpickle.load(file) - if not (isinstance(obj, cls)): - raise TypeError("Loaded object is not a DelayedSubmission object.") + try: + with flock: + with open(filename, "rb") as file: + obj = cloudpickle.load(file) + if not (isinstance(obj, cls)): + raise TypeError( + "Loaded object is not a DelayedSubmission object." + ) + except TimeOutError: + logger.error(f"Could not obtain lock for {filename} in 2 minutes.") + return None return obj diff --git a/joblib_htcondor/executor.py b/joblib_htcondor/executor.py index c3c768c..0f1acbc 100644 --- a/joblib_htcondor/executor.py +++ b/joblib_htcondor/executor.py @@ -5,6 +5,7 @@ # License: AGPL from datetime import datetime +import time def logger_level(arg): @@ -103,7 +104,15 @@ def logger_level(arg): # Load file logger.info(f"Loading DelayedSubmission object from {fname}") - ds = DelayedSubmission.load(fname) + ds = None + while ds is None: + ds = DelayedSubmission.load(fname) + if ds is None: + logger.warning( + f"Could not load DelayedSubmission object from {fname}. " + "Retrying in 1 second." + ) + time.sleep(1) # Wait 1 second before retrying # Issue warning for re-running if ds.done(): @@ -119,5 +128,13 @@ def logger_level(arg): out_fname = fname.with_stem(f"{old_stem}_out") logger.info(f"Dumping DelayedSubmission (result only) to {out_fname}") # Dump output - ds.dump(out_fname, result_only=True) + dumped = False + while not dumped: + dumped = ds.dump(out_fname, result_only=True) + if not dumped: + logger.warning( + f"Could not dump DelayedSubmission to {out_fname}. " + "Retrying in 1 second." + ) + time.sleep(1) logger.info("Done.") From 7f0b9315f5c89109d836d69157d9762e3aaa5ab1 Mon Sep 17 00:00:00 2001 From: Fede Raimondo Date: Fri, 25 Oct 2024 09:59:04 +0200 Subject: [PATCH 2/3] Fix linter --- joblib_htcondor/backend.py | 1 - joblib_htcondor/executor.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/joblib_htcondor/backend.py b/joblib_htcondor/backend.py index 61fd587..479e7ff 100644 --- a/joblib_htcondor/backend.py +++ b/joblib_htcondor/backend.py @@ -21,7 +21,6 @@ Any, Callable, Optional, - Tuple, Union, ) from uuid import uuid1 diff --git a/joblib_htcondor/executor.py b/joblib_htcondor/executor.py index 0f1acbc..46b4763 100644 --- a/joblib_htcondor/executor.py +++ b/joblib_htcondor/executor.py @@ -4,8 +4,8 @@ # Synchon Mandal # License: AGPL -from datetime import datetime import time +from datetime import datetime def logger_level(arg): From c93df1c7401f2607aa0165aa78a4760be8696a88 Mon Sep 17 00:00:00 2001 From: Fede Raimondo Date: Fri, 25 Oct 2024 10:07:58 +0200 Subject: [PATCH 3/3] Add newsfragment --- changelog.d/6.fixed.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6.fixed.md diff --git a/changelog.d/6.fixed.md b/changelog.d/6.fixed.md new file mode 100644 index 0000000..4317bab --- /dev/null +++ b/changelog.d/6.fixed.md @@ -0,0 +1 @@ +Retry load/dump of the DelayedSubmission object in case a TimeOutError from the flufl.lock is raised