Skip to content

Commit

Permalink
Merge pull request #6 from juaml/fix/retry_lock
Browse files Browse the repository at this point in the history
Retry lock of DelayedSubmission if a timeout occurs
  • Loading branch information
fraimondo authored Oct 25, 2024
2 parents c082423 + c93df1c commit b9aaa5e
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 32 deletions.
1 change: 1 addition & 0 deletions changelog.d/6.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Retry load/dump of the DelayedSubmission object in case a TimeOutError from the flufl.lock is raised
32 changes: 20 additions & 12 deletions joblib_htcondor/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ def get_nested_backend(self) -> tuple["ParallelBackendBase", int]:
parent_uuid=self._this_batch_name,
), self._n_jobs

def __reduce__(self) -> tuple[callable, tuple]:
def __reduce__(self) -> tuple[Callable, tuple]:
return (
_HTCondorBackendFactory.build,
(
Expand Down Expand Up @@ -816,10 +816,10 @@ def _submit(

if self._export_metadata:
# add a new task to the metadata
self._backend_meta.task_status.append(
self._backend_meta.task_status.append( # type: ignore
_TaskMeta(request_cpus=self._request_cpus)
)
self._backend_meta.n_tasks += 1
self._backend_meta.n_tasks += 1 # type: ignore

# Add to queue so the poller can take care of it
self._queued_jobs_list.append(
Expand Down Expand Up @@ -884,9 +884,14 @@ def _watcher(self) -> None:
level=9, msg=f"Dumping pickle file {to_submit}"
)
# Dump pickle file
to_submit.delayed_submission.dump(
dumped = to_submit.delayed_submission.dump(
to_submit.pickle_fname
)
if not dumped:
# Something went wrong, continue and submit this
# later
logger.debug("Could not dump pickle file.")
continue
# Submit job
logger.log(level=9, msg=f"Submitting job {to_submit}")
to_submit.htcondor_submit_result = self._client.submit(
Expand All @@ -895,7 +900,7 @@ def _watcher(self) -> None:
)
logger.log(level=9, msg="Getting cluster id.")
# Set the cluster id
to_submit.cluster_id = (
to_submit.cluster_id = ( # type: ignore
to_submit.htcondor_submit_result.cluster()
)
logger.log(level=9, msg="Job submitted.")
Expand All @@ -904,14 +909,14 @@ def _watcher(self) -> None:
level=9, msg="Updating task status timestamp."
)
if self._export_metadata:
self._backend_meta.task_status[
self._backend_meta.task_status[ # type: ignore
to_submit.task_id - 1
].sent_timestamp = datetime.now()

logger.log(
level=9, msg="Updating task status cluster id."
)
self._backend_meta.task_status[
self._backend_meta.task_status[ # type: ignore
to_submit.task_id - 1
].cluster_id = to_submit.cluster_id

Expand Down Expand Up @@ -956,12 +961,15 @@ def _poll_jobs(self) -> tuple[int, bool]: # noqa: C901
)
if out_fname.exists():
logger.log(level=9, msg=f"Job {job_meta} is done.")
done_jobs.append(job_meta)
out_fname = job_meta.pickle_fname.with_stem(
f"{job_meta.pickle_fname.stem}_out"
)
# Load the DelayedSubmission object
ds = DelayedSubmission.load(out_fname)
if ds is None:
# Something went wrong, continue and poll later
continue
done_jobs.append(job_meta)
result = ds.result()
if ds.error():
logger.log(
Expand All @@ -984,7 +992,7 @@ def _poll_jobs(self) -> tuple[int, bool]: # noqa: C901
self._completed_jobs_list.append(job_meta)
if self._export_metadata:
# Set the done timestamp from the ds object
self._backend_meta.task_status[
self._backend_meta.task_status[ # type: ignore
job_meta.task_id - 1
].done_timestamp = ds.done_timestamp()

Expand All @@ -996,7 +1004,7 @@ def _poll_jobs(self) -> tuple[int, bool]: # noqa: C901
run_fname = job_meta.pickle_fname.with_suffix(".run")
update_meta = (
update_meta
or self._backend_meta.task_status[
or self._backend_meta.task_status[ # type: ignore
job_meta.task_id - 1
].update_run_from_file(run_fname)
)
Expand All @@ -1014,7 +1022,7 @@ def _poll_jobs(self) -> tuple[int, bool]: # noqa: C901
run_fname = job_meta.pickle_fname.with_suffix(".run")
if self._export_metadata:
# Update run from file again
self._backend_meta.task_status[
self._backend_meta.task_status[ # type: ignore
job_meta.task_id - 1
].update_run_from_file(run_fname)
run_fname.unlink()
Expand Down Expand Up @@ -1220,5 +1228,5 @@ def build(
export_metadata=export_metadata,
)
out._recursion_level = recursion_level
out._parent_uuid = parent_uuid
out._parent_uuid = parent_uuid # type: ignore
return out
51 changes: 33 additions & 18 deletions joblib_htcondor/delayed_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
from pathlib import Path
from typing import Any, Callable, Optional, Union

from flufl.lock import Lock
from flufl.lock import Lock, TimeOutError # type: ignore
from joblib.externals.cloudpickle import cloudpickle # type: ignore

from .logging import logger


__all__ = ["DelayedSubmission"]

Expand All @@ -34,10 +36,10 @@ def _get_lock(fname: Union[Path, str], *args: Any, **kwargs: Any) -> Lock:
Lock object.
"""
if isinstance(fname, Path):
lock_fname = Path(fname).with_suffix(".lock")
else:
lock_fname = Path(fname + ".lock")
if not isinstance(fname, Path):
fname = Path(fname)
lock_fname = fname.with_suffix(".lock")

return Lock(lock_fname.as_posix(), *args, **kwargs)


Expand Down Expand Up @@ -76,7 +78,7 @@ def __init__(
def run(self) -> None:
"""Run the function with the arguments and store the result."""
try:
self._result = self.func(*self.args, **self.kwargs)
self._result = self.func(*self.args, **self.kwargs) # type: ignore
except BaseException as e: # noqa: BLE001
self._result = _ExceptionWithTraceback(
e,
Expand Down Expand Up @@ -133,7 +135,7 @@ def result(self) -> Any:

def dump(
self, filename: Union[str, Path], result_only: bool = False
) -> None:
) -> bool:
"""Dump the object to a file.
Parameters
Expand All @@ -155,19 +157,25 @@ def dump(
# Get lockfile
flock = _get_lock(fname=filename, lifetime=120) # Max 2 minutes
# Dump in the lockfile
with flock:
with open(filename, "wb") as file:
cloudpickle.dump(self, file)
try:
with flock:
with open(filename, "wb") as file:
cloudpickle.dump(self, file)
except TimeOutError:
logger.error(f"Could not obtain lock for {filename} in 2 minutes.")
return False
# Set to original values
if result_only:
self.func = tmp_func
self.args = tmp_args
self.kwargs = tmp_kwargs

return True

@classmethod
def load(
cls: type["DelayedSubmission"], filename: Union[str, Path]
) -> "DelayedSubmission":
) -> Optional["DelayedSubmission"]:
"""Load a DelayedSubmission object from a file.
Parameters
Expand All @@ -177,8 +185,9 @@ def load(
Returns
-------
DelayedSubmission
The loaded DelayedSubmission object.
DelayedSubmission or None
The loaded DelayedSubmission object. If a TimeOutError is raised
while obtaining the lock, returns None.
Raises
------
Expand All @@ -189,9 +198,15 @@ def load(
# Get lockfile
flock = _get_lock(filename, lifetime=120) # Max 2 minutes
# Load from the lockfile
with flock:
with open(filename, "rb") as file:
obj = cloudpickle.load(file)
if not (isinstance(obj, cls)):
raise TypeError("Loaded object is not a DelayedSubmission object.")
try:
with flock:
with open(filename, "rb") as file:
obj = cloudpickle.load(file)
if not (isinstance(obj, cls)):
raise TypeError(
"Loaded object is not a DelayedSubmission object."
)
except TimeOutError:
logger.error(f"Could not obtain lock for {filename} in 2 minutes.")
return None
return obj
21 changes: 19 additions & 2 deletions joblib_htcondor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# Synchon Mandal <[email protected]>
# License: AGPL

import time
from datetime import datetime


Expand Down Expand Up @@ -103,7 +104,15 @@ def logger_level(arg):

# Load file
logger.info(f"Loading DelayedSubmission object from {fname}")
ds = DelayedSubmission.load(fname)
ds = None
while ds is None:
ds = DelayedSubmission.load(fname)
if ds is None:
logger.warning(
f"Could not load DelayedSubmission object from {fname}. "
"Retrying in 1 second."
)
time.sleep(1) # Wait 1 second before retrying

# Issue warning for re-running
if ds.done():
Expand All @@ -119,5 +128,13 @@ def logger_level(arg):
out_fname = fname.with_stem(f"{old_stem}_out")
logger.info(f"Dumping DelayedSubmission (result only) to {out_fname}")
# Dump output
ds.dump(out_fname, result_only=True)
dumped = False
while not dumped:
dumped = ds.dump(out_fname, result_only=True)
if not dumped:
logger.warning(
f"Could not dump DelayedSubmission to {out_fname}. "
"Retrying in 1 second."
)
time.sleep(1)
logger.info("Done.")

0 comments on commit b9aaa5e

Please sign in to comment.