Skip to content

Commit

Permalink
add first try at dask
Browse files Browse the repository at this point in the history
  • Loading branch information
cmantill committed Aug 18, 2021
1 parent 2d23f67 commit f01c7ac
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 6 deletions.
44 changes: 44 additions & 0 deletions bootstrap.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env bash

cat <<EOF > shell
#!/usr/bin/env bash
if [ "\$1" == "" ]; then
export COFFEA_IMAGE=coffeateam/coffea-dask:latest
else
export COFFEA_IMAGE=\$1
fi
singularity exec -B \${PWD}:/srv -B /uscmst1b_scratch --pwd /srv \\
/cvmfs/unpacked.cern.ch/registry.hub.docker.com/\${COFFEA_IMAGE} \\
/bin/bash --rcfile /srv/.bashrc
EOF

cat <<EOF > .bashrc
install_env() {
set -e
echo "Installing shallow virtual environment in \$PWD/.env..."
python -m venv --without-pip --system-site-packages .env
unlink .env/lib64 # HTCondor can't transfer symlink to directory and it appears optional
# work around issues copying CVMFS xattr when copying to tmpdir
export TMPDIR=\$(mktemp -d -p .)
.env/bin/python -m ipykernel install --user
rm -rf \$TMPDIR && unset TMPDIR
.env/bin/python -m pip install -q git+https://github.com/CoffeaTeam/[email protected]
.env/bin/python -m pip install --user --editable .
echo "done."
}
export JUPYTER_PATH=/srv/.jupyter
export JUPYTER_RUNTIME_DIR=/srv/.local/share/jupyter/runtime
export JUPYTER_DATA_DIR=/srv/.local/share/jupyter
export IPYTHONDIR=/srv/.ipython
[[ -d .env ]] || install_env
source .env/bin/activate
alias pip="python -m pip"
EOF

chmod u+x shell .bashrc
echo "Wrote shell and .bashrc to current directory. You can delete this file. Run ./shell to start the singularity shell"
47 changes: 41 additions & 6 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,55 @@ def main(args):

print(f"Metrics: {metrics}")

filehandler = open(f'outfiles/{args.year}_{args.sample}_{args.starti}-{args.endi}.hist', 'wb')
pickle.dump(out, filehandler)
filehandler.close()

elif args.dask:
import time
from distributed import Client
from lpcjobqueue import LPCCondorCluster

tic = time.time()
cluster = LPCCondorCluster(
)
cluster.adapt(minimum=4, maximum=10)
client = Client(cluster)

exe_args = {
'client': client,
'savemetrics': True,
'schema': NanoAODSchema,
'align_clusters': True,
}

print("Waiting for at least one worker...")
client.wait_for_workers(1)

out, metrics = processor.run_uproot_job(
fileset,
treename="Events",
processor_instance=p,
executor=processor.dask_executor,
executor_args=exe_args,
)

elapsed = time.time() - tic
print(f"Metrics: {metrics}")
print(f"Finished in {elapsed:.1f}s")

filehandler = open(f'outfiles/{args.year}_{args.sample}_{args.starti}-{args.endi}.hist', 'wb')
pickle.dump(out, filehandler)
filehandler.close()

if __name__ == "__main__":
# e.g.
# inside a condor job: python run.py --year 2017 --processor hww --condor --starti 0 --endi 1 --fileset fileset_2017_UL_NANO.json --sample GluGluHToTauTau_M125_TuneCP5_13TeV-powheg-pythia8
# inside a condor job: python run.py --year 2017 --processor hww --condor --starti 0 --endi 1 --fileset metadata.json --sample GluGluHToWWToLNuQQ_M125_TuneCP5_PSweight_13TeV-powheg2-jhugen727-pythia8
# inside a dask job: python run.py --year 2017 --processor hww --dask --fileset metadata.json --sample GluGluHToWWToLNuQQ_M125_TuneCP5_PSweight_13TeV-powheg2-jhugen727-pythia8

parser = argparse.ArgumentParser()
parser.add_argument('--year', dest='year', default='2017', help="year", type=str)
parser.add_argument('--starti', dest='starti', default=0, help="start index of files", type=int)
parser.add_argument('--endi', dest='endi', default=-1, help="end index of files", type=int)
parser.add_argument("--processor", dest="processor", default="hww", help="HWW processor", type=str)
parser.add_argument("--condor", dest="condor", action="store_true", default=True, help="Run with condor")
parser.add_argument("--condor", dest="condor", action="store_true", default=False, help="Run with condor")
parser.add_argument("--dask", dest="dask", action="store_true", default=False, help="Run with dask")
parser.add_argument("--fileset", dest="fileset", default=None, help="Fileset", required=True)
parser.add_argument('--sample', dest='sample', default=None, help='sample name', required=True)
args = parser.parse_args()
Expand Down

0 comments on commit f01c7ac

Please sign in to comment.