Skip to content

Commit

Permalink
combine in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
rkansal47 committed Feb 18, 2025
1 parent 69179e7 commit eb9db90
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 29 deletions.
11 changes: 6 additions & 5 deletions condor/submit.templ.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mkdir outfiles

for t2_prefix in ${t2_prefixes}
do
for folder in pickles parquet root githashes
for folder in pickles parquet root jobchecks
do
xrdfs $${t2_prefix} mkdir -p -mrwxr-xr-x "/${outdir}/$${folder}"
done
Expand All @@ -33,7 +33,7 @@ echo "https://github.com/$gituser/$repo/commit/$${commithash}" > commithash.txt
#move output to t2s
for t2_prefix in ${t2_prefixes}
do
xrdcp -f commithash.txt $${t2_prefix}/${outdir}/githashes/commithash_${jobnum}.txt
xrdcp -f commithash.txt $${t2_prefix}/${outdir}/jobchecks/commithash_${jobnum}.txt
done

pip install -e .
Expand All @@ -48,11 +48,12 @@ python -u -W ignore $script --year $year --starti $starti --endi $endi --samples
#move output to t2s
for t2_prefix in ${t2_prefixes}
do
xrdcp -f num_batches*.txt "$${t2_prefix}/${outdir}/jobchecks/"
xrdcp -f outfiles/* "$${t2_prefix}/${outdir}/pickles/out_${jobnum}.pkl"
xrdcp -f *.parquet "$${t2_prefix}/${outdir}/parquet/out_${jobnum}.parquet"
xrdcp -f *.root "$${t2_prefix}/${outdir}/root/nano_skim_${jobnum}.root"
xrdcp -f *.parquet "$${t2_prefix}/${outdir}/parquet/"
xrdcp -f *.root "$${t2_prefix}/${outdir}/root/"
done

rm *.parquet
rm *.root
rm commithash.txt
rm *.txt
72 changes: 48 additions & 24 deletions src/boostedhh/run_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ def parse_common_run_args(parser):
default="files",
help="sample name of files being run on, if --files option used",
)
parser.add_argument(
"--batch-size",
type=int,
default=20,
help="# of outputs to combine into a single output if saving .parquet or .root files",
)
parser.add_argument("--yaml", default=None, help="yaml file", type=str)


Expand Down Expand Up @@ -222,8 +228,13 @@ def run(
save_root: bool,
filetag: str, # should be starti-endi
executor: str = "iterative",
batch_size: int = 20,
):
"""Run processor without fancy dask (outputs then need to be accumulated manually)"""
"""
Run processor without fancy dask (outputs then need to be accumulated manually)
batch_size (int): used to combine a ``batch_size`` number of outputs into one parquet / root
"""
add_mixins(nanoevents) # update nanoevents schema

# outputs are saved here as pickles
Expand Down Expand Up @@ -276,31 +287,44 @@ def run(

with Path(f"{outdir}/{filetag}.pkl").open("wb") as f:
pickle.dump(out, f)

# need to combine all the files from these processors before transferring to EOS
# otherwise it will complain about too many small files

if save_parquet or save_root:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

pddf = pd.read_parquet(local_parquet_dir)

if save_parquet:
# need to write with pyarrow as pd.to_parquet doesn't support different types in
# multi-index column names
table = pa.Table.from_pandas(pddf)
pq.write_table(table, f"{local_dir}/{filetag}.parquet")

if save_root:
import awkward as ak

with uproot.recreate(
f"{local_dir}/nano_skim_{filetag}.root", compression=uproot.LZ4(4)
) as rfile:
rfile["Events"] = ak.Array(
# take only top-level column names in multiindex df
flatten_dict(
{key: np.squeeze(pddf[key].values) for key in pddf.columns.levels[0]}

# Get all parquet files
path = Path(local_parquet_dir)
parquet_files = list(path.glob("*.parquet"))
print(parquet_files)

num_batches = int(np.ceil(len(parquet_files) / batch_size))
Path(f"num_batches_{filetag}_{num_batches}.txt").touch()

# need to combine all the files from these processors before transferring to EOS
# otherwise it will complain about too many small files
for i in range(num_batches):
print(i)
batch = parquet_files[i * batch_size : (i + 1) * batch_size]
print(batch)
print([pd.read_parquet(f) for f in batch])
pddf = pd.concat([pd.read_parquet(f) for f in batch])

if save_parquet:
# need to write with pyarrow as pd.to_parquet doesn't support different types in
# multi-index column names
table = pa.Table.from_pandas(pddf)
pq.write_table(table, f"{local_dir}/out_{filetag}_batch_{i}.parquet")

if save_root:
import awkward as ak

with uproot.recreate(
f"{local_dir}/nano_skim_{filetag}_batch_{i}.root", compression=uproot.LZ4(4)
) as rfile:
rfile["Events"] = ak.Array(
# take only top-level column names in multiindex df
flatten_dict(
{key: np.squeeze(pddf[key].values) for key in pddf.columns.levels[0]}
)
)
)

0 comments on commit eb9db90

Please sign in to comment.