Skip to content

Commit

Permalink
condor
Browse files Browse the repository at this point in the history
  • Loading branch information
rkansal47 committed Nov 7, 2024
1 parent 8b64f2c commit 4dba50e
Show file tree
Hide file tree
Showing 9 changed files with 577 additions and 33 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ docker_stderror
/*.err
/*.out
/*.log
/condor
src/HH4b/postprocessing/**/*.png
src/HH4b/postprocessing/**/*.pdf
src/HH4b/boosted/**/*.png
Expand Down
Empty file added condor/__init__.py
Empty file.
148 changes: 148 additions & 0 deletions condor/check_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""
Checks that there is an output for each job submitted.
Author: Raghav Kansal
"""

from __future__ import annotations

import argparse
import os
from os import listdir
from pathlib import Path

import numpy as np
from HH4b import run_utils
from HH4b.run_utils import print_red

parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
"--processor",
default="trigger",
help="which processor",
type=str,
choices=["trigger_boosted", "skimmer", "matching"],
)

parser.add_argument("--tag", default="", help="tag for jobs", type=str)
parser.add_argument("--year", default="20122", help="year", type=str)
parser.add_argument("--user", default="rkansal", help="user", type=str)
run_utils.add_bool_arg(parser, "submit-missing", default=False, help="submit missing files")
run_utils.add_bool_arg(
parser,
"check-running",
default=False,
help="check against running jobs as well (running_jobs.txt will be updated automatically)",
)

args = parser.parse_args()


eosdir = f"/eos/uscms/store/user/{args.user}/bbbb/{args.processor}/{args.tag}/{args.year}/"

samples = listdir(eosdir)
jdls = [jdl for jdl in listdir(f"condor/{args.processor}/{args.tag}/") if jdl.endswith(".jdl")]

jdl_dict = {}
for sample in samples:
x = [
int(jdl[:-4].split("_")[-1])
for jdl in jdls
if jdl.split("_")[0] == args.year and "_".join(jdl.split("_")[1:-1]) == sample
]
if len(x) > 0:
jdl_dict[sample] = np.sort(x)[-1] + 1

"""
jdl_dict = {
sample: np.sort(
[
int(jdl[:-4].split("_")[-1])
for jdl in jdls
if jdl.split("_")[0] == args.year and "_".join(jdl.split("_")[1:-1]) == sample
]
)[-1]
+ 1
for sample in samples
}
"""


running_jobs = []
if args.check_running:
os.system("condor_q | awk '{print $9}' > running_jobs.txt")
with Path("running_jobs.txt").open() as f:
lines = f.readlines()

running_jobs = [s[:-4] for s in lines if s.endswith(".sh\n")]


missing_files = []
err_files = []


for sample in samples:
print(f"Checking {sample}")

if args.processor != "trigger":
if not Path(f"{eosdir}/{sample}/parquet").exists():
print_red(f"No parquet directory for {sample}!")
if sample not in jdl_dict:
continue

for i in range(jdl_dict[sample]):
if f"{args.year}_{sample}_{i}" in running_jobs:
print(f"Job #{i} for sample {sample} is running.")
continue

jdl_file = f"condor/{args.processor}/{args.tag}/{args.year}_{sample}_{i}.jdl"
err_file = f"condor/{args.processor}/{args.tag}/logs/{args.year}_{sample}_{i}.err"
print(jdl_file)
missing_files.append(jdl_file)
err_files.append(err_file)
if args.submit_missing:
os.system(f"condor_submit {jdl_file}")

continue

outs_parquet = [
int(out.split(".")[0].split("_")[-1]) for out in listdir(f"{eosdir}/{sample}/parquet")
]
print(f"Out parquets: {outs_parquet}")

if not Path(f"{eosdir}/{sample}/pickles").exists():
print_red(f"No pickles directory for {sample}!")
continue

outs_pickles = [
int(out.split(".")[0].split("_")[-1]) for out in listdir(f"{eosdir}/{sample}/pickles")
]

if args.processor == "trigger":
print(f"Out pickles: {outs_pickles}")

for i in range(jdl_dict[sample]):
if i not in outs_pickles:
if f"{args.year}_{sample}_{i}" in running_jobs:
print(f"Job #{i} for sample {sample} is running.")
continue

print_red(f"Missing output pickle #{i} for sample {sample}")
jdl_file = f"condor/{args.processor}/{args.tag}/{args.year}_{sample}_{i}.jdl"
err_file = f"condor/{args.processor}/{args.tag}/logs/{args.year}_{sample}_{i}.err"
missing_files.append(jdl_file)
err_files.append(err_file)
if args.submit_missing:
os.system(f"condor_submit {jdl_file}")

if args.processor != "trigger" and i not in outs_parquet:
print_red(f"Missing output parquet #{i} for sample {sample}")


print(f"{len(missing_files)} files to re-run:")
for f in missing_files:
print(f)

print("\nError files:")
for f in err_files:
print(f)
100 changes: 100 additions & 0 deletions condor/combine_pickles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""
Combines Coffea processor output pickle files
Author(s): Raghav Kansal
"""

from __future__ import annotations

import argparse
import os
import pickle
from os import listdir
from pathlib import Path

from coffea.processor.accumulator import accumulate
from HH4b import run_utils
from tqdm import tqdm


def accumulate_files(files: list):
"""accumulates pickle files from files list via coffea.processor.accumulator.accumulate"""

with Path(files[0]).open("rb") as file:
out = pickle.load(file)

for ifile in tqdm(files[1:]):
with Path(ifile).open("rb") as file:
out = accumulate([out, pickle.load(file)])

return out


if __name__ == "__main__":
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
run_utils.parse_common_args(parser) # year, processor
parser.add_argument("--tag", type=str, required=True)
parser.add_argument("--name", default="combined", help="name of combined files", type=str)
parser.add_argument(
"--inuser", default="", help="username where pickles are saved (if not you)", type=str
)
parser.add_argument(
"--outuser",
default="",
help="username where combined output will be saved (if not you)",
type=str,
)
run_utils.add_bool_arg(
parser, "r", default=False, help="combine files in sub and subsubdirectories of indir"
)
run_utils.add_bool_arg(
parser,
"separate-samples",
default=False,
help="combine different samples' pickles separately",
)
args = parser.parse_args()

user = os.getlogin()
if args.inuser == "":
args.inuser = user
if args.outuser == "":
args.outuser = user

tag_dir = f"/eos/uscms/store/user/{args.inuser}/HH4b/{args.processor}/{args.tag}"
indir = f"{tag_dir}/{args.year}/"

outdir = f"/eos/uscms/store/user/{args.outuser}/HH4b/{args.processor}/{args.tag}/"
os.system(f"mkdir -p {outdir}")

print("Inputs directory:", indir)
print("Outputs directory:", outdir)

files = [indir + "/" + file for file in listdir(indir) if file.endswith(".pkl")]
out_dict = {}

if args.r:
samples = [d for d in listdir(indir) if (Path(indir) / d / "pickles").is_dir()]

for sample in samples:
print(sample)
pickle_path = f"{indir}/{sample}/pickles/"
sample_files = [
pickle_path + "/" + file for file in listdir(pickle_path) if file.endswith(".pkl")
]

if args.separate_samples:
out_dict[sample] = accumulate_files(sample_files)
else:
files += sample_files

if args.separate_samples:
out = {args.year: out_dict}
else:
print(f"Accumulating {len(files)} files")
out = accumulate_files(files)

with Path(f"{outdir}/{args.year}_{args.name}.pkl").open("wb") as f:
pickle.dump(out, f)

print(f"Saved to {outdir}/{args.year}_{args.name}.pkl")
Loading

0 comments on commit 4dba50e

Please sign in to comment.