Skip to content

Commit

Permalink
Make daily processing work again. Not sure what happened...
Browse files Browse the repository at this point in the history
  • Loading branch information
ershockley committed May 23, 2017
1 parent 50f87a7 commit 8f84294
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 18 deletions.
11 changes: 5 additions & 6 deletions cax/dag_prescript.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ def has_tag(doc, name):

def clear_errors(id, pax_version, detector='tpc'):

if isinstance(id, int):
if detector == 'tpc':
identifier = 'number'
elif isinstance(id, str):
if detector == 'tpc':
print("Warning: is %s the run name?" % id)
id = int(id)
elif detector == 'muon_veto':
identifier = 'name'
else:
raise ValueError("run id is not int or string")
raise ValueError("detector is neither tpc nor muon_veto")

query = {'detector': detector,
identifier : id,
Expand Down Expand Up @@ -103,7 +102,7 @@ def pre_script(run_name, pax_version, run_number, detector = 'tpc'):

# first clear any relevant errors
if detector == 'tpc':
run_id = run_number
run_id = int(run_number)
identifier = 'number'

elif detector == 'muon_veto':
Expand Down
4 changes: 2 additions & 2 deletions cax/dag_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def write_submit_script(self, filename, logdir):
Output = {logdir}/pax_$(pax_version)/$(dirname)/$(zip_name)_$(cluster).log
Log = {logdir}/pax_$(pax_version)/$(dirname)/joblogs/$(zip_name)_$(cluster).joblog
Requirements = (HAS_CVMFS_xenon_opensciencegrid_org) && (((TARGET.GLIDEIN_ResourceName =!= MY.MachineAttrGLIDEIN_ResourceName1) || (RCC_Factory == "ciconnect")) && ((TARGET.GLIDEIN_ResourceName =!= MY.MachineAttrGLIDEIN_ResourceName2) || (RCC_Factory == "ciconnect")) && ((TARGET.GLIDEIN_ResourceName =!= MY.MachineAttrGLIDEIN_ResourceName3) || (RCC_Factory == "ciconnect")) && ((TARGET.GLIDEIN_ResourceName =!= MY.MachineAttrGLIDEIN_ResourceName4) || (RCC_Factory == "ciconnect"))) && (OSGVO_OS_STRING == "RHEL 6" || RCC_Factory == "ciconnect")
Requirements = (HAS_CVMFS_xenon_opensciencegrid_org) && (((TARGET.GLIDEIN_ResourceName =!= MY.MachineAttrGLIDEIN_ResourceName1) || (RCC_Factory == "ciconnect")) && ((TARGET.GLIDEIN_ResourceName =!= MY.MachineAttrGLIDEIN_ResourceName2) || (RCC_Factory == "ciconnect")) && ((TARGET.GLIDEIN_ResourceName =!= MY.MachineAttrGLIDEIN_ResourceName3) || (RCC_Factory == "ciconnect")) && ((TARGET.GLIDEIN_ResourceName =!= MY.MachineAttrGLIDEIN_ResourceName4) || (RCC_Factory == "ciconnect"))) && (OSGVO_OS_STRING == "RHEL 6" || RCC_Factory == "ciconnect") && (GLIDEIN_ResourceName =!= "Comet")
request_cpus = $(ncpus)
request_memory = 1900MB
request_disk = 3GB
Expand All @@ -427,7 +427,7 @@ def write_submit_script(self, filename, logdir):
when_to_transfer_output = ON_EXIT
# on_exit_hold = (ExitBySignal == True) || (ExitCode != 0)
transfer_executable = True
periodic_remove = ((JobStatus == 2) && ((CurrentTime - EnteredCurrentStatus) > (60*60*10)))
periodic_remove = ((JobStatus == 2) && ((CurrentTime - EnteredCurrentStatus) > (60*60*12)))
#periodic_release = (JobStatus == 5) && (HoldReason == 3) && (NumJobStarts < 5) && ( (CurrentTime - EnteredCurrentStatus) > $RANDOM_INTEGER(60, 1800, 30) )
#periodic_remove = (NumJobStarts > 4)
arguments = $(name) $(input_file) $(host) $(pax_version) $(pax_hash) $(out_location) $(ncpus) $(disable_updates) $(json_file) $(on_rucio)
Expand Down
4 changes: 2 additions & 2 deletions cax/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ def go(self, specify_run = None):
if specify_run is not None:
if isinstance(specify_run,int):
self.query['number'] = specify_run
if 'data' in self.query:
clear_errors(specify_run, self.query["data"]["$not"]["$elemMatch"]['pax_version'])
#if 'data' in self.query:
#clear_errors(specify_run, self.query["data"]["$not"]["$elemMatch"]['pax_version'])
elif isinstance(specify_run,str):
self.query['name'] = specify_run

Expand Down
16 changes: 8 additions & 8 deletions cax/tasks/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ def __init__(self):
query = {"data" : {"$not" : {"$elemMatch" : {"type" : "processed",
"pax_version" : self.pax_version,
"host" : self.thishost,
# commenting now so OSG does all processing
"$or" : [{"status" : "transferred"},
{"status" : "transferring"}
]
Expand All @@ -219,6 +218,7 @@ def __init__(self):
'tags' : {"$not" : {'$elemMatch' : {'name' : 'donotprocess'}}}
}


# if using OSG processing then need raw data in rucio catalog
# will check later if on UC_OSG_USERDISK
if self.thishost == 'login':
Expand All @@ -228,7 +228,7 @@ def __init__(self):
query["data"]["$elemMatch"] = {"host" : self.thishost,
"type" : "raw"}

#print(query)
print(query)

Task.__init__(self, query = query)

Expand Down Expand Up @@ -332,7 +332,7 @@ def each_run(self):

# now check how many dags are running
self.log.debug("%d dags currently running" % len(qsub.get_queue()))
if len(qsub.get_queue()) > 49:
if len(qsub.get_queue()) > 29:
self.log.info("Too many dags in queue, waiting 10 minutes")
time.sleep(60*10)
return
Expand Down Expand Up @@ -400,17 +400,17 @@ def each_run(self):
'creation_time' : datetime.datetime.utcnow(),
'creation_place': 'OSG'}

self.submit(out_location, ncpus, disable_updates, json_file)

if detector == 'tpc':
clear_errors(self.run_doc['number'], self.pax_version, detector)
elif detector == 'muon_veto':
clear_errors(self.run_doc['name'], self.pax_version, detector)

if config.DATABASE_LOG == True:
self.API.add_location(self.run_doc['_id'], datum)
self.submit(out_location, ncpus, disable_updates, json_file)

#if config.DATABASE_LOG == True:
# self.API.add_location(self.run_doc['_id'], datum)

time.sleep(5)
time.sleep(2)

def local_data_finder(self):
have_processed = False
Expand Down

0 comments on commit 8f84294

Please sign in to comment.