-
Notifications
You must be signed in to change notification settings - Fork 4
/
submit-plots-dask.py
81 lines (62 loc) · 2.36 KB
/
submit-plots-dask.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import os, sys
import subprocess
import json
import uproot
import awkward as ak
from coffea import processor, util, hist
from coffea.nanoevents import NanoEventsFactory, NanoAODSchema
from boostedhiggs import VBFPlotProcessor
from distributed import Client
from lpcjobqueue import LPCCondorCluster
from dask.distributed import performance_report
from dask_jobqueue import HTCondorCluster, SLURMCluster
from datetime import datetime
env_extra = [
f"export PYTHONPATH=$PYTHONPATH:{os.getcwd()}",
]
cluster = LPCCondorCluster(
transfer_input_files=["boostedhiggs"],
ship_env=True,
memory="8GB",
# image="coffeateam/coffea-dask:0.7.11-fastjet-3.3.4.0rc9-ga05a1f8",
)
if not os.path.isdir('outfiles-plots/'):
os.mkdir('outfiles-plots')
cluster.adapt(minimum=1, maximum=250)
with Client(cluster) as client:
print(datetime.now())
print("Waiting for at least one worker...") # noqa
client.wait_for_workers(1)
print(datetime.now())
year = sys.argv[1]
with performance_report(filename="dask-report.html"):
infiles = subprocess.getoutput("ls infiles/"+year+"_*.json").split()
for this_file in infiles:
index = this_file.split("_")[1].split(".json")[0]
outfile = 'outfiles-plots/'+str(year)+'_dask_'+index+'.coffea'
if os.path.isfile(outfile):
print("File " + outfile + " already exists. Skipping.")
continue
else:
print("Begin running " + outfile)
print(datetime.now())
uproot.open.defaults["xrootd_handler"] = uproot.source.xrootd.MultithreadedXRootDSource
p = VBFPlotProcessor(year=year,jet_arbitration='ddb')
args = {'savemetrics':True, 'schema':NanoAODSchema}
output = processor.run_uproot_job(
this_file,
treename="Events",
processor_instance=p,
executor=processor.dask_executor,
executor_args={
"client": client,
"skipbadfiles": 1,
"schema": processor.NanoAODSchema,
"treereduction": 2,
},
chunksize=100000,
# maxchunks=args.max,
)
util.save(output, outfile)
print("saved " + outfile)
print(datetime.now())