Skip to content

Commit

Permalink
Merge branch 'master' into selection_without_weights
Browse files Browse the repository at this point in the history
  • Loading branch information
riga authored Jun 12, 2024
2 parents 2e792a3 + 8e068ee commit ce6371c
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import order as od
from scinum import Number

from columnflow.util import DotDict, maybe_import
from columnflow.columnar_util import EMPTY_FLOAT, ColumnCollection
from columnflow.config_util import (
get_root_processes_from_campaign, add_shift_aliases, add_category, verify_config_processes,
)
from columnflow.columnar_util import EMPTY_FLOAT, ColumnCollection, skip_column
from columnflow.util import DotDict, maybe_import

ak = maybe_import("awkward")

Expand Down
2 changes: 1 addition & 1 deletion columnflow/tasks/cms/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def workflow_requires(self):

if cat_obj.config_data_datasets:
for dataset in cat_obj.config_data_datasets:
data_dataset_params[dataset].add(cat_obj.config_variable)
data_dataset_params[dataset]["variables"].add(cat_obj.config_variable)

# set workflow requirements per mc dataset
reqs["merged_hists"] = set(
Expand Down
11 changes: 7 additions & 4 deletions columnflow/tasks/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,15 +222,16 @@ def iter_nano_files(

# measure the time required to perform the stat query
logger.debug(f"checking fs {selected_fs} for lfn {lfn}")
input_file = target_cls(lfn, fs=selected_fs)
input_file = target_cls(lfn.lstrip(os.sep) if is_local else lfn, fs=selected_fs)
t1 = time.perf_counter()
input_stat = input_file.exists(stat=True)
duration = time.perf_counter() - t1
i += 1
logger.info(f"file {lfn} does{'' if input_stat else ' not'} exist at fs {selected_fs}")

# when the stat query took longer than 2 seconds, eagerly try the next fs
# when the stat query took longer than some duration, eagerly try the next fs
# and check if it responds faster and if so, take it instead
latency = 4.0 # s
if input_stat and eager_lookup:
if (
isinstance(eager_lookup, int) and
Expand All @@ -239,9 +240,11 @@ def iter_nano_files(
):
logger.debug(f"eager fs lookup skipped for fs {selected_fs} at index {i}")
else:
if input_stat and not last_working and duration > 2.0 and i < len(fs):
if input_stat and not last_working and duration > latency and i < len(fs):
last_working = selected_fs, input_file, input_stat, duration
logger.debug("duration exceeded 2s, checking next fs for comparison")
logger.debug(
f"duration exceeded {latency}s, checking next fs for comparison",
)
continue
if last_working and (not input_stat or last_working[3] < duration):
logger.debug("previously checked fs responded faster")
Expand Down
2 changes: 1 addition & 1 deletion modules/law

0 comments on commit ce6371c

Please sign in to comment.