Skip to content

Commit

Permalink
Small internal renaming in lfn fs lookup.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Jan 17, 2024
1 parent 71d0370 commit 240fd00
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 31 deletions.
60 changes: 30 additions & 30 deletions columnflow/tasks/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def get_dataset_lfns_dasgoclient(
def iter_nano_files(
self,
task: AnalysisTask | DatasetTask,
remote_fs: str | Sequence[str] | None = None,
fs: str | Sequence[str] | None = None,
lfn_indices: list[int] | None = None,
eager_lookup: bool | int = 1,
) -> None:
Expand All @@ -159,15 +159,15 @@ def iter_nano_files(
workflow whose branch value is used instead.
:param task: Current task that needs to access the nanoAOD files
:param remote_fs: Name of the remote file system where the LFNs are located, defaults to None
:param fs: Name of the local or remote file system where the LFNs are located, defaults to None
:param lfn_indices: List of indices of LFNs that are processed by this *task* instance, defaults to None
:param eager_lookup: Look at the next remote fs in *remote_fs* if stat takes too long, defaults to 1
:param eager_lookup: Look at the next fs if stat takes too long, defaults to 1
:raises TypeError: If *task* is not of type :external+law:py:class:`~law.workflow.base.BaseWorkflow` or not
a task analyzing a single branch in the task tree
:raises Exception: If current task is not complete as indicated with ``self.complete()``
:raises ValueError: If no remote fs is provided at call and none can be found in either the config instance
or the law config.
:raises Exception: If a given LFN cannot be found at any remote file system
:raises ValueError: If no fs is provided at call and none can be found in either the config instance or the law
config.
:raises Exception: If a given LFN cannot be found at any fs
:yield: a file target that points to a LFN
"""
# input checks
Expand All @@ -178,18 +178,18 @@ def iter_nano_files(
if not self.complete():
raise Exception(f"{self} is required to be complete")

# prepare the remote fs names to resolve lfns with
if not remote_fs:
# prepare fs names to resolve lfns with
if not fs:
# use an optional hook in the config
get_remote_fs = self.config_inst.x("get_dataset_lfns_remote_fs", None)
if callable(get_remote_fs):
remote_fs = get_remote_fs(task.dataset_inst)
if not remote_fs:
get_fs = self.config_inst.x("get_dataset_lfns_remote_fs", None)
if callable(get_fs):
fs = get_fs(task.dataset_inst)
if not fs:
# use the law config
remote_fs = law.config.get_expanded("outputs", "lfn_sources", [], split_csv=True)
if not remote_fs:
raise ValueError("no remote_fs given or found to resolve lfns")
remote_fs = law.util.make_list(remote_fs)
fs = law.config.get_expanded("outputs", "lfn_sources", [], split_csv=True)
if not fs:
raise ValueError("no fs given or found to resolve lfns")
fs = law.util.make_list(fs)

# get all lfns
output = self.output()
Expand All @@ -206,24 +206,24 @@ def iter_nano_files(
# get the input file
i = 0
last_working = None
while i < len(remote_fs):
fs = remote_fs[i]
logger.debug(f"checking fs {fs} for lfn {lfn}")
while i < len(fs):
selected_fs = fs[i]
logger.debug(f"checking fs {selected_fs} for lfn {lfn}")

# check if the fs is really remote or local
fs_base = law.config.get_expanded(fs, "base")
fs_base = law.config.get_expanded(selected_fs, "base")
is_local = law.target.file.get_scheme(fs_base) in (None, "file")
logger.debug(f"fs {fs} is {'local' if is_local else 'remote'}")
logger.debug(f"fs {selected_fs} is {'local' if is_local else 'remote'}")
target_cls = law.LocalFileTarget if is_local else law.wlcg.WLCGFileTarget

# measure the time required to perform the stat query
logger.debug(f"checking fs {fs} for lfn {lfn}")
input_file = target_cls(lfn.lstrip(os.sep) if is_local else lfn, fs=fs)
logger.debug(f"checking fs {selected_fs} for lfn {lfn}")
input_file = target_cls(lfn, fs=selected_fs)
t1 = time.perf_counter()
input_stat = input_file.exists(stat=True)
duration = time.perf_counter() - t1
i += 1
logger.info(f"file {lfn} does{'' if input_stat else ' not'} exist at fs {fs}")
logger.info(f"file {lfn} does{'' if input_stat else ' not'} exist at fs {selected_fs}")

# when the stat query took longer than 2 seconds, eagerly try the next fs
# and check if it responds faster and if so, take it instead
Expand All @@ -233,25 +233,25 @@ def iter_nano_files(
not isinstance(eager_lookup, bool) and
i <= eager_lookup
):
logger.debug(f"eager fs lookup skipped for fs {fs} at index {i}")
logger.debug(f"eager fs lookup skipped for fs {selected_fs} at index {i}")
else:
if input_stat and not last_working and duration > 2.0 and i < len(remote_fs):
last_working = fs, input_file, input_stat, duration
if input_stat and not last_working and duration > 2.0 and i < len(fs):
last_working = selected_fs, input_file, input_stat, duration
logger.debug("duration exceeded 2s, checking next fs for comparison")
continue
if last_working and (not input_stat or last_working[3] < duration):
logger.debug("previously checked fs responded faster")
fs, input_file, input_stat, duration = last_working
selected_fs, input_file, input_stat, duration = last_working

# stop when the stat was successful at this point
if input_stat:
task.publish_message(
f"using fs {fs}, stat responded in "
f"using fs {selected_fs}, stat responded in "
f"{law.util.human_duration(seconds=duration)}",
)
break
else:
raise Exception(f"LFN {lfn} not found at any remote fs {remote_fs}")
raise Exception(f"LFN {lfn} not found at any remote fs {fs}")

# log the file size
input_size = law.util.human_bytes(input_stat.st_size, fmt=True)
Expand Down
2 changes: 1 addition & 1 deletion modules/law
Submodule law updated 1 files
+2 −2 law/target/local.py

0 comments on commit 240fd00

Please sign in to comment.