Skip to content

Commit

Permalink
Modify cax to submit condor dagman jobs rather than regular jobs. Min…
Browse files Browse the repository at this point in the history
…or changes in some files in osg_scripts
  • Loading branch information
ershockley committed Nov 14, 2016
1 parent 5babaea commit f7f86d9
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 49 deletions.
28 changes: 13 additions & 15 deletions cax/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,24 +211,22 @@ def fill_args(args, default_args):

# for OSG
CONDOR_TEMPLATE = """#!/bin/bash
executable = /home/ershockley/caxOSG_testing/run_cax.sh
executable = /home/ershockley/cax/osg_scripts/run_xenon.sh
universe = vanilla
Error = /home/ershockley/caxOSG_testing/log/try8/{name}_{pax_version}_$(Cluster).log
Output = /home/ershockley/caxOSG_testing/log/try8/{name}_{pax_version}_$(Cluster).log
Log = /home/ershockley/caxOSG_testing/log/try8/joblogs/{name}_{pax_version}_$(Cluster)_JOBLOG.log
Requirements = (OpSysAndVer =?= "SL6") && (GLIDEIN_ResourceName =!= "BNL-ATLAS") && (GLIDEIN_ResourceName =!= "AGLT2") && (GLIDEIN_ResourceName =!= "Clemson-Palmetto") && ( GLIDEIN_ResourceName =!= "NPX" ) && (GLIDEIN_ResourceName =!= "MIT_CMS")
request_cpus = {ncpus}
request_Memory = 16384
request_disk=52428800
Error = /xenon/ershockley/cax/$(name)/$(cluster)_$(pax_version).error
Output = /xenon/ershockley/cax/$(name)/$(cluster)_$(pax_version).log
Log = /xenon/ershockley/cax/$(name)/$(cluster)_$(pax_version)_JOBLOG.log
Requirements = ((OpSysAndVer == "CentOS6" || OpSysAndVer == "RedHat6" || OpSysAndVer == "SL6") && (GLIDEIN_ResourceName =!= "NPX"))
request_cpus = $(ncpus)
transfer_input_files = /home/ershockley/user_cert
transfer_output_files = ""
+WANT_RCC_ciconnect = True
transfer_input_files = /home/ershockley/user_cert,/xenon/ershockley/ziplists/{name}_ziplist.txt
transfer_output_files = output
when_to_transfer_output = ON_EXIT
on_exit_hold = (ExitBySignal == True) || (ExitCode != 0)
# on_exit_hold = (ExitBySignal == True) || (ExitCode != 0)
transfer_executable = True
periodic_release = (NumJobStarts < 5) && ((CurrentTime - EnteredCurrentStatus) > 600)
arguments = {name} {base}/{name} {host} {pax_version} {pax_hash} {out_location} {ncpus} {disable_updates}
# periodic_release = ((NumJobStarts < 5) && ((CurrentTime - EnteredCurrentStatus) > 600))
arguments = $(name) $(input_file) $(host) $(pax_version) $(pax_hash) $(out_location) $(ncpus) $(disable_updates)
queue 1
"""

Expand Down Expand Up @@ -288,7 +286,7 @@ def processing_script(args={}):
args = {k:v.format(**args) if isinstance(v, str) else v for k,v in args.items()}
# os.makedirs(args['base']+"/"+args['use']+("/%d"%args['number'])+"_"+args['pax_version'], exist_ok=True)

script_template = CONDOR_TEMPLATE.format(**args)
script_template = CONDOR_TEMPLATE # .format(**args)

else:
raise NotImplementedError("Host %s processing not implemented",
Expand Down
37 changes: 34 additions & 3 deletions cax/qsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,11 @@ def submit_job(host,script, name, extra=''):

#Different submit command for using OSG
if host == 'login':
which('condor_submit')
which('condor_submit_dag')

# Effect of the arguments for condor_submit:
# http://research.cs.wisc.edu/htcondor/manual/v7.6/condor_submit.html

submit_command = ('condor_submit {extra} {script}'
submit_command = ('condor_submit_dag {extra} {script}'
.format(script=fileobj.name,
extra=extra))

Expand Down Expand Up @@ -78,6 +77,38 @@ def submit_job(host,script, name, extra=''):

delete_script(fileobj)

def submit_dag_job(name, dag_file, inputdir, outputdir, submitscript, paxversion):

which('condor_submit_dag')

# create submit file, which in turn is used by dag file.
submitfileobj = create_script(submitscript)

# create dag file
dag_maker = "~/cax/osg_scripts/write_xenon_dag.py --inputdir {inputdir} --names {name} --outputdir {outputdir} --submitfile {submitfile} --paxversion {paxversion} -o {dag_file}"

os.system(dag_maker.format(inputdir=inputdir,
name=name,
outputdir=outputdir,
submitfile=submitfileobj.name,
paxversion=paxversion,
dag_file=dag_file))


submit_command = ('condor_submit_dag {script}'.format(script=dag_file))

logging.info('submit job:\n %s' % submit_command)

try:
result = subprocess.check_output(submit_command,
stderr=subprocess.STDOUT,
shell=True,
timeout=120)
except subprocess.TimeoutExpired as e:
logging.error("Process timeout")
except Exception as e:
logging.exception(e)
# delete_script(fileobj)

def create_script(script):
"""Create script as temp file to be run on cluster"""
Expand Down
20 changes: 16 additions & 4 deletions cax/tasks/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,19 @@ def submit(self, in_location, host, pax_version, pax_hash, out_location,

script = config.processing_script(script_args)
self.log.info(script)
qsub.submit_job(host, script, name + "_" + pax_version)

if host == 'login':
dag_dir = "/xenon/ershockley/cax/{name}/{number}_{pax_version}/".format(name=name,
number=number,
pax_version=pax_version)
if not os.path.exists(dag_dir):
os.makedirs(dag_dir)

dag_file = dag_dir + "{name}.dag".format(name=name)
qsub.submit_dag_job(name, dag_file, in_location, out_location, script, pax_version)

else:
qsub.submit_job(host, script, name + "_" + pax_version)

def verify(self):
"""Verify processing worked"""
Expand All @@ -208,8 +220,8 @@ def each_run(self):

if 'gains' not in processing_parameters or \
'electron_lifetime_liquid' not in processing_parameters:
self.log.debug('no gains or electron lifetime!')
#return
self.log.debug('no gains or electron lifetime! skipping processing')
return

thishost = config.get_hostname()

Expand Down Expand Up @@ -254,7 +266,7 @@ def each_run(self):
for version in versions:
pax_hash = "n/a"

out_location = 'output/' #config.get_processing_dir(thishost, version)
out_location = '/xenon/ershockley/processed' #config.get_processing_dir(thishost, version)

if have_processed[version]:
self.log.info("Skipping %s already processed with %s",
Expand Down
33 changes: 17 additions & 16 deletions osg_scripts/write_xenon_dag.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
from __future__ import print_function
import os
import urlparse
# import urlparse
import logging as log
from optparse import OptionParser

Expand Down Expand Up @@ -32,7 +32,7 @@
or srm server location and the mount point for the data
on the server
`--inputfilefilter`: (Required) File identifier, for example XENON1T-
`--runnumbers`: A list of run numbers, i.e. YYMMDD_HHMM, to process
`--names`: A list of run numbers, i.e. YYMMDD_HHMM, to process
`--muonveto`: Process muon veto file
`--submitfile`: (Required) HTCondor submit file to be used
`--paxversion`: (Required) pax version to be used
Expand All @@ -46,7 +46,7 @@ def get_out_name(filename):
return os.path.splitext(filename)[0] + ".root"


def get_run_number(dir_name):
def get_run_name(dir_name):
"""
Getting run number from directory
"""
Expand Down Expand Up @@ -80,33 +80,33 @@ def write_dag_file(options):
for dir_name, subdir_list, file_list in os.walk(options.inputdir):
if not options.run_muonveto and "MV" in dir_name:
continue
run_number = get_run_number(dir_name)
if (options.runnumbers is not None and
run_number not in options.runnumbers):
run_name = get_run_name(dir_name)
if (options.names is not None and
run_name not in options.names):
continue
for infile in file_list:
if options.inputfilefilter not in infile:
continue
filepath, file_extenstion = os.path.splitext(infile)
if file_extenstion != ".zip":
continue
run_number = get_run_number(dir_name)
run_name = get_run_name(dir_name)
outfile = get_out_name(infile)
infile = os.path.abspath(os.path.join(dir_name, infile))
infile = options.uri + infile
if not os.path.exists(os.path.join(options.outputdir,
run_number)):
os.makedirs(os.path.join(options.outputdir, run_number))
run_name)):
os.makedirs(os.path.join(options.outputdir, run_name))
outfile = os.path.abspath(os.path.join(options.outputdir,
run_number, outfile))
run_name, outfile))
outfile = options.uri + outfile
dag_file.write("JOB XENON.%d %s\n" % (i, options.submitfile))
dag_file.write(("VARS XENON.%d input_file=\"%s\" "
"out_location=\"%s\" name=\"%s\" "
"ncpus=\"1\" disable_updates=\"True\" "
"host=\"login\" pax_version=\"%s\" "
"pax_hash=\"n/a\"\n") % (i, infile,
outfile, run_number,
outfile, run_name,
options.paxversion))
dag_file.write("Retry XENON.%d 3\n" % i)
i += 1
Expand All @@ -127,14 +127,14 @@ def main(options, args):
"through and find input files"))
parser.add_option("--outputdir", dest="outputdir", default=None,
help="Force update information")
parser.add_option("--uri", dest="uri", default=None,
parser.add_option("--uri", dest="uri", default="gsiftp://gridftp.grid.uchicago.edu:2811/cephfs/srm",
help="Force update information")
parser.add_option("--inputfilefilter", dest="inputfilefilter",
default="XENON1T-", help=("Filter by which to "
"limit # of input files"))
parser.add_option("--runnumbers", dest="runnumbers", default=None,
parser.add_option("--names", dest="names", default=None,
action="callback", callback=callback_optparse,
help="Run numbers to consider")
help="Run names to consider")
parser.add_option("--muonveto", dest="run_muonveto", action="store_true",
default=False, help="Process Muon Veto data ")
parser.add_option("--submitfile", dest="submitfile", default=None,
Expand All @@ -146,7 +146,8 @@ def main(options, args):
parser.error("No output DAG file provided")
else:
if os.path.exists(options.outdagfile):
parser.error("Output DAG file exists. Please rename or delete.")
print("Output DAG file exists. Overwriting")
#parser.error("Output DAG file exists. Please rename or delete.")
if options.inputdir is None:
parser.error("No input dir provided")
if options.outputdir is None:
Expand All @@ -167,4 +168,4 @@ def main(options, args):
4: log.DEBUG
}.get(options.verbosity, log.DEBUG)
log.basicConfig(level=level)
main(options, args)
main(options, args)
10 changes: 5 additions & 5 deletions osg_scripts/xenon.submit
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#!/bin/bash
executable = /home/briedel/Xenon1T/processing_test/run_xenon.sh
executable = /home/ershockley/cax/osg_scripts/run_xenon.sh
universe = vanilla
Error = /local-scratch/briedel/test_xenon_processing/$(name)/$(cluster)_$(pax_version).error
Output = /local-scratch/briedel/test_xenon_processing/$(name)/$(cluster)_$(pax_version).log
Log = /local-scratch/briedel/test_xenon_processing/$(name)/$(cluster)_$(pax_version)_JOBLOG.log
Error = /home/ershockley/caxOSG_testing/single_zip/logs/try1/$(name)/$(cluster)_$(pax_version).error
Output = /home/ershockley/caxOSG_testing/single_zip/logs/try1/$(name)/$(cluster)_$(pax_version).log
Log = /home/ershockley/caxOSG_testing/single_zip/logs/try1/$(name)/$(cluster)_$(pax_version)_JOBLOG.log

Requirements = ((OpSysAndVer == "CentOS6" || OpSysAndVer == "RedHat6" || OpSysAndVer == "SL6") && (GLIDEIN_ResourceName =!= "NPX"))
request_cpus = $(ncpus)
transfer_input_files = /home/briedel/user_cert
transfer_input_files = /home/ershockley/user_cert
transfer_output_files = ""
+WANT_RCC_ciconnect = True
when_to_transfer_output = ON_EXIT
Expand Down
2 changes: 1 addition & 1 deletion osg_scripts/xenon_debug.dag
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
JOB XENON.0 xenon_debug.submit
VARS XENON.0 input_file="gsiftp://gridftp.grid.uchicago.edu:2811/cephfs/srm/xenon/briedel_transfers/raw/161025_0501/XENON1T-4036-000026000-000026999-000001000.zip" host="login" pax_version="5.5.1" pax_hash="n/a" out_location="gsiftp://gridftp.grid.uchicago.edu:2811/cephfs/srm/xenon/briedel_transfers/output/XENON1T-4036-000026000-000026999-000001000.root" ncpus="1" disable_updates="True" name="161025_0501"
VARS XENON.0 input_file="gsiftp://gridftp.grid.uchicago.edu:2811/cephfs/srm/xenon/xenon1t/raw/161025_0501/XENON1T-4036-000026000-000026999-000001000.zip" host="login" pax_version="5.6.5" pax_hash="n/a" out_location="gsiftp://gridftp.grid.uchicago.edu:2811/cephfs/srm/xenon/briedel_transfers/output/XENON1T-4036-000026000-000026999-000001000.root" ncpus="1" disable_updates="True" name="161025_0501"
10 changes: 5 additions & 5 deletions osg_scripts/xenon_debug.submit
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
#!/bin/bash
executable = /home/briedel/Xenon1T/processing_test/run_xenon_debug.sh
executable = /home/ershockley/cax/osg_scripts/run_xenon.sh
universe = vanilla
Error = /local-scratch/briedel/debug/$(cluster)_$(pax_version).error
Output = /local-scratch/briedel/debug/$(cluster)_$(pax_version).log
Log = /local-scratch/briedel/debug/$(cluster)_$(pax_version)_JOBLOG.log
Error = /home/ershockley/caxOSG_testing/single_zip/logs/try1/$(cluster)_$(pax_version).error
Output = /home/ershockley/caxOSG_testing/single_zip/logs/try1/$(cluster)_$(pax_version).log
Log = /home/ershockley/caxOSG_testing/single_zip/logs/try1/$(cluster)_$(pax_version)_JOBLOG.log

Requirements = ((OpSysAndVer == "CentOS6" || OpSysAndVer == "RedHat6" || OpSysAndVer == "SL6"))
# && (GLIDEIN_ResourceName =!= "BNL-ATLAS") &&
request_cpus = $(ncpus)
transfer_input_files = /home/briedel/user_cert
transfer_input_files = /home/ershockley/user_cert
transfer_output_files = ""
when_to_transfer_output = ON_EXIT
# on_exit_hold = (ExitBySignal == True) || (ExitCode != 0)
Expand Down

0 comments on commit f7f86d9

Please sign in to comment.