From 7886dcdae12d682c9ac8eb6af828a3b6a2a3d46b Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 2 May 2024 10:56:45 +0000 Subject: [PATCH] quentin has experienced a glaring race condition in make_rundirs when launching multiple workflows at once. it's unclear to me why its *so bad* in his situation, but the race condition exists none-the-less and this PR complicates rundir creation to accept that it might be run multiple times concurrently --- parsl/dataflow/errors.py | 4 +++ parsl/dataflow/rundirs.py | 62 ++++++++++++++++++++++++++------------- 2 files changed, 45 insertions(+), 21 deletions(-) diff --git a/parsl/dataflow/errors.py b/parsl/dataflow/errors.py index 5d6c0c8710..26abbc560c 100644 --- a/parsl/dataflow/errors.py +++ b/parsl/dataflow/errors.py @@ -63,3 +63,7 @@ def __init__(self, dependent_exceptions_tids: Sequence[Tuple[BaseException, Opti def __str__(self) -> str: dep_tids = [tid for (exception, tid) in self.dependent_exceptions_tids] return "Join failure for task {} with failed join dependencies from tasks {}".format(self.task_id, dep_tids) + + +class RundirCreateError(ParslError): + pass diff --git a/parsl/dataflow/rundirs.py b/parsl/dataflow/rundirs.py index f32979473f..7c8b19a0a6 100644 --- a/parsl/dataflow/rundirs.py +++ b/parsl/dataflow/rundirs.py @@ -1,11 +1,15 @@ import os from glob import glob import logging +import random +import time + +from parsl.dataflow.errors import RundirCreateError logger = logging.getLogger(__name__) -def make_rundir(path: str) -> str: +def make_rundir(path: str, *, max_tries: int = 3) -> str: """When a path has not been specified, make the run directory. Creates a rundir with the following hierarchy: @@ -18,23 +22,39 @@ def make_rundir(path: str) -> str: Kwargs: - path (str): String path to a specific run dir """ - try: - if not os.path.exists(path): - os.makedirs(path) - - prev_rundirs = glob(os.path.join(path, "[0-9]*[0-9]")) - - current_rundir = os.path.join(path, '000') - - if prev_rundirs: - # Since we globbed on files named as 0-9 - x = sorted([int(os.path.basename(x)) for x in prev_rundirs])[-1] - current_rundir = os.path.join(path, '{0:03}'.format(x + 1)) - - os.makedirs(current_rundir) - logger.debug("Parsl run initializing in rundir: {0}".format(current_rundir)) - return os.path.abspath(current_rundir) - - except Exception: - logger.exception("Failed to create run directory") - raise + backoff_time_s = 1 + random.random() + + os.makedirs(path, exist_ok=True) + + # try_count is 1-based for human readability + try_count = 1 + while True: + + # Python 3.10 introduces root_dir argument to glob which in future + # can be used to simplify this code, something like: + # prev_rundirs = glob("[0-9]*[0-9]", root_dir=path) + full_prev_rundirs = glob(os.path.join(path, "[0-9]*[0-9]")) + prev_rundirs = [os.path.basename(d) for d in full_prev_rundirs] + + next = max([int(d) for d in prev_rundirs] + [-1]) + 1 + + current_rundir = os.path.join(path, '{0:03}'.format(next)) + + try: + os.makedirs(current_rundir) + logger.debug("rundir created: %s", current_rundir) + return os.path.abspath(current_rundir) + except FileExistsError: + logger.warning(f"Could not create rundir {current_rundir} on try {try_count}") + + if try_count >= max_tries: + raise + else: + logger.debug("Backing off {}s", backoff_time_s) + time.sleep(backoff_time_s) + backoff_time_s *= 2 + random.random() + try_count += 1 + + # this should never be reached - the above loop should have either returned + # or raised an exception on the last try + raise RundirCreateError()