Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processing on OSG #67

Open
wants to merge 113 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
113 commits
Select commit Hold shift + click to select a range
9da727a
Incorporate OSG Connect as a processing site, first steps:
Jun 24, 2016
b08ae8f
Merge with master v3.0.5
ershockley Jun 29, 2016
59e0457
prepare process.py for installation on OASIS
ershockley Jul 5, 2016
c546e55
Prepare for testing api/osg_dev on ci-connect machine. Python not wor…
ershockley Aug 30, 2016
529a72e
Revert back to old (semi-working) version of OSG_dev and add api.py
ershockley Aug 30, 2016
8aa8fca
Revert back to old (semi-working) version of OSG_dev and add api.py
ershockley Aug 30, 2016
b3d4600
Merge branch 'OSG_dev2' of https://github.com/XENON1T/cax into OSG_dev2
ershockley Aug 30, 2016
732517a
This version launches processing but throws a pickling error
Aug 31, 2016
fbe8a9c
Working version on midway
Sep 1, 2016
e49b71f
Update for login.ci-connect.uchicago.edu
ershockley Sep 1, 2016
17ff365
first attempt at resolving conflicts from merge with master
ershockley Sep 7, 2016
85984da
Still debugging. Pushing to be able to test on midway (ci running slo…
ershockley Sep 9, 2016
b3479b9
API calls work on OSG. Need to implement workaround for pax pymongo call
ershockley Sep 15, 2016
d14a5d0
modify process.py so that config dictionary for processing called usi…
ershockley Sep 20, 2016
c7942c8
Processing works on OSG, though bookmarking still needs polishing. La…
ershockley Nov 9, 2016
536de88
Adding first pass of OSG scripts
briedel Nov 9, 2016
2bea618
Improved version
briedel Nov 10, 2016
755d882
debugging, minor changes
briedel Nov 13, 2016
5babaea
Adding Friends of MWT2 special sauce
briedel Nov 13, 2016
f7f86d9
Modify cax to submit condor dagman jobs rather than regular jobs. Min…
ershockley Nov 14, 2016
47dfda8
Adding preliminary OSG guide
briedel Nov 15, 2016
71ee993
Merge branch 'OSG_dev2' of https://github.com/XENON1T/cax into OSG_dev2
briedel Nov 15, 2016
e261147
Adding more OSG docs
briedel Nov 15, 2016
0fe90a6
Small changes
briedel Nov 16, 2016
c59c515
Adding first draft of rucio docs
briedel Nov 22, 2016
518f8cd
Breaking out admin guide
briedel Nov 23, 2016
4724bde
Moving more things around
briedel Nov 23, 2016
9eb2b03
Add post script to dag and first implementation of database bookkeeping
ershockley Nov 24, 2016
cbaa0da
Adding tutorial
briedel Nov 30, 2016
2ea9df2
Update run_xenon.sh
tunnell Nov 30, 2016
62017c1
Adding more detail to tutorial
briedel Dec 1, 2016
d413397
Merge branch 'OSG_dev2' of https://github.com/XENON1T/cax into OSG_dev2
briedel Dec 1, 2016
759a6ae
Minor formatting
briedel Dec 1, 2016
90b4d63
Make several changes to optimize dag submission, post scripts, pre sc…
ershockley Dec 5, 2016
45ae1bf
Merge with Benedikts changes in osg_scripts
ershockley Dec 5, 2016
b285824
Make several changes including dag submission, post scripts, pre scri…
ershockley Dec 5, 2016
89eee80
Add flexible API queries. Some other small changes
ershockley Dec 6, 2016
fb7db43
More small changes
ershockley Dec 6, 2016
798ba77
Merge branches 'OSG_dev2' and 'master' of https://github.com/XENON1T/…
pdeperio Dec 7, 2016
16c6971
Missed additional merge from HEAD
pdeperio Dec 7, 2016
d247b3c
Exception to not purge Stash processing errors
pdeperio Dec 7, 2016
56be9f4
Fix conditional bug
pdeperio Dec 7, 2016
77d8081
Remove .idea stuff
pdeperio Dec 7, 2016
6d64324
Merge branch 'OSG_dev2' of https://github.com/XENON1T/cax into OSG_dev2
pdeperio Dec 7, 2016
bad0664
Make thishost and pax_version common member variables
pdeperio Dec 7, 2016
e921024
Fix qsub for massive-cax
pdeperio Dec 7, 2016
ea50970
Last commit before merging with master-merged OSG_dev2
ershockley Dec 12, 2016
1aa0af8
Merge branch 'OSG_dev2' of https://github.com/XENON1T/cax into OSG_dev2
ershockley Dec 12, 2016
ab8c572
Small fix in cax.json
ershockley Dec 12, 2016
9cd2325
Fix bug allowing for duplicate DB entries on midway. Also change oute…
ershockley Dec 16, 2016
9a8ba4b
Add some optimizations to OSG processing. Make cax-like processing wo…
ershockley Jan 3, 2017
beb30ad
Update run_xenon.sh for rucio downloads. Small change in process.py t…
ershockley Feb 18, 2017
524917c
add get_gains.py. Originally written to only try processing runs that…
ershockley Feb 18, 2017
48bc695
Add one-line fix for to corrections.py to add acquisition monitor cha…
ershockley Feb 18, 2017
8045b1a
Merge with master to implement rsync and ruciax
ershockley Feb 19, 2017
2f80f25
Update dag_writer.py
ershockley Feb 27, 2017
13c0886
Add midway gfal functionality to dag_writer.py
ershockley Feb 28, 2017
b3da3df
Optimizations on DAG side of things. Make sure rsync implemented
ershockley Mar 9, 2017
ac8000c
have dag_writer.py skips runs if raw data not in chicago (stash, midw…
ershockley Mar 14, 2017
ef82706
Add drift velocity check to dag_prescript.py. Few other small changes.
ershockley Mar 20, 2017
fb3f681
Implement Automatic OSG processing
ershockley Mar 28, 2017
29cb5ee
Implement MV processing on OSG. Other small changes including queries…
ershockley May 10, 2017
7673e68
Optimizations in run_xenon.sh for rucio transfers, few other small ch…
ershockley May 17, 2017
755ff45
Fix things that broke for some reason in DB handling
ershockley May 23, 2017
50f87a7
Fix DB handling which broke somehow
ershockley May 23, 2017
8f84294
Make daily processing work again. Not sure what happened...
ershockley May 23, 2017
99724dc
Make daily processing work again. Not sure what happened...
ershockley May 23, 2017
8206766
Merge branch 'OSG_dev2' of https://github.com/XENON1T/cax into OSG_dev2
ershockley May 23, 2017
d955f5a
Process NG data only
ershockley May 26, 2017
b28b706
Starting to make changes to api, but not finished
ershockley May 27, 2017
b697d8b
Add --api option so that we can circumvent pymongo calls
ershockley May 27, 2017
1b34fd9
Add --api option so that we can circumvent pymongo calls
ershockley May 27, 2017
7d057d5
remove hardcoded 'transferred' in data_mover.py. Other small changes
ershockley Jun 1, 2017
8b3ccb9
Remove NG specification in process.py
ershockley Jun 1, 2017
68154d5
fix conflicts in api.py and data_mover.py
ershockley Jun 1, 2017
2db8120
Some cleaning up. Add .gitignore and stage_data.py
ershockley Jun 1, 2017
f1a7cc0
Implement MV changes. Some other small changes.
ershockley Jun 6, 2017
f8707bd
Implement MV changes. Some other small changes.
ershockley Jun 6, 2017
2c3de99
Fix small bug in process.py
ershockley Jun 6, 2017
e7f218d
Small changes. Commit before dag_writer.py overhaul
ershockley Jun 9, 2017
bc08822
Add determine_rse.py so that jobs download from European rucio endpoi…
ershockley Jun 11, 2017
e5a670b
Remove ershockley dependence
ershockley Jun 11, 2017
2dbd1d0
More changes - hopefully to remove any ershockley dependence
ershockley Jun 16, 2017
d041356
Alfio's info
ershockley Jun 21, 2017
6b18a4a
Set GLIDEIN_Country to US if not set
ershockley Jun 21, 2017
bdde49a
Add dag_config
ershockley Jun 21, 2017
a8785f2
Merge branch 'any_user' of https://github.com/XENON1T/cax into any_user
ershockley Jun 21, 2017
c1744d1
More fixes to allow any user to run code
ershockley Jul 5, 2017
91b1b8b
dunno
ershockley Jul 9, 2017
c166b4b
Merge branch 'OSG_dev2' of https://github.com/XENON1T/cax into OSG_dev2
ershockley Jul 9, 2017
0030597
multiple retries for copy back to stash
ershockley Jul 10, 2017
948d14c
Change DB query to only process newest runs with v6.6.6
ershockley Jul 14, 2017
ea972a3
Add curl commands for job monitoring
ershockley Jul 14, 2017
90046dd
Fix bug in run_xenon.sh
ershockley Jul 14, 2017
f3878cc
Updates to make_runlist.py, some other small improvements
ershockley Jul 16, 2017
d5fab9d
Remove annoying print statement
ershockley Jul 17, 2017
4dfd0c3
Merge
ershockley Jul 17, 2017
68b6a60
Clean up finished
ershockley Aug 11, 2017
8cddb56
Commit before merging with benedikt cleanup
ershockley Aug 16, 2017
d092e2b
Merge with benedikt_cleanup
ershockley Aug 16, 2017
44d673e
Revert back to old commit
ershockley Aug 17, 2017
221317b
dumb commit
ershockley Aug 17, 2017
0e697eb
Merge branch 'revert' into OSG_dev2
ershockley Aug 17, 2017
889e932
Improvements with permissions setting, etc.
ershockley Aug 22, 2017
9caed6d
Hopefully fix for any user to use again
ershockley Aug 25, 2017
305e4ec
Add some hardcoded pieces but hopefully will result in less errors
ershockley Aug 29, 2017
b6000e3
Change settings for SR1 reprocessing
ershockley Sep 14, 2017
de24493
Update hadd_and_upload.sh
ershockley Sep 14, 2017
58c948f
Modify caxdir lines to use home directory
ershockley Sep 18, 2017
7ee5800
Try to remove gfal-debug stuff?
ershockley Sep 19, 2017
4e781c8
Shouldn't be any major changes - SR1 daily processing running normal
ershockley Oct 26, 2017
8d2a1e7
Small changes to dag_writer, adding SURFSARA
ershockley Oct 31, 2017
55fabbf
Remove gfal debug, add CNAF to some of the rucio scripts, other small…
ershockley Nov 6, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add --api option so that we can circumvent pymongo calls
ershockley committed May 27, 2017

Verified

This commit was signed with the committer’s verified signature.
zztkm zztkm
commit 1b34fd935c64d3eb108288a0aecb5cf435285b91
31 changes: 18 additions & 13 deletions cax/api.py
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ def __init__(self):

# Runs DB Query Parameters
self.api_url = config.API_URL
self.api_schema = "https://xenon1t-daq.lngs.infn.it"
self.api_schema = "https://xenon1t-daq.lngs.infn.it" # needed for using self.next_run
self.get_params = {
"username": config.api_user(),
"api_key": config.api_key(),
@@ -31,7 +31,7 @@ def __init__(self):

self.logging = logging.getLogger(self.__class__.__name__)

def get_next_run(self, query):
def get_next_run(self, query, _id=None):
ret = None
if self.next_run == None:
return ret
@@ -45,11 +45,13 @@ def get_next_run(self, query):

params['limit']=1
params['offset']=0


url = self.api_url if _id is None else (self.api_url + str(_id) + "/")

api_try = 1
while api_try <= 3:
try:
db_request = requests.get(self.api_url, params = params).text
db_request = requests.get(url, params = params).text
break
except:
time.sleep(5)
@@ -65,12 +67,16 @@ def get_next_run(self, query):

# Keep track of the next run so we can iterate.
if ret is not None:
self.next_run = ret['meta']['next']
if len(ret['objects'])==0:
return None
if _id is None:
self.next_run = ret['meta']['next']
if len(ret['objects'])==0:
return None

return ret['objects'][0]['doc']
return ret['objects'][0]['doc']

else:
self.next_run = None # otherwise self.get_all_runs would be an infinite loop
return ret['doc']
return None

def add_location(self, uuid, parameters):
@@ -132,12 +138,11 @@ def verify_site(self, sitea, siteb):
(sitea['type'] == siteb['type']) and
(sitea['location'] == siteb['location']))

def get_all_runs(self, query, limit):
def get_all_runs(self, query, _id=None):
# return list of rundocs for all runs satisfying query
collection = []
counter = 0
while self.next_run is not None and counter < limit:
collection.append(self.get_next_run(query))
counter += 1
query = {'query' : dumps(query,default=json_util.default)}
while self.next_run is not None:
collection.append(self.get_next_run(query, _id))

return collection
2 changes: 1 addition & 1 deletion cax/dag_writer.py
Original file line number Diff line number Diff line change
@@ -63,7 +63,7 @@ def get_run_doc(self, run_id):
detector = 'muon_veto'

query = {identifier : run_id}
API = api(detector=detector)
API = api()
doc = API.get_next_run(query)
time.sleep(0.1)
return doc
10 changes: 8 additions & 2 deletions cax/main.py
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@ def main():
help="Select a single run using the run name")
parser.add_argument('--host', type=str,
help="Host to pretend to be")
parser.add_argument('--api', action='store_true', help='Uses API interface instead of pymongo')

args = parser.parse_args()

@@ -92,12 +93,17 @@ def main():
args.config_file)
config.set_json(args.config_file)

if args.api:
use_api = True
else:
use_api = False

tasks = [
corrections.AddElectronLifetime(), # Add electron lifetime to run, which is just a function of calendar time
corrections.AddGains(), # Adds gains to a run, where this is computed using slow control information
#corrections.AddSlowControlInformation(),
data_mover.CopyPull(), # Download data through e.g. scp to this location
data_mover.CopyPush(), # Upload data through e.g. scp or gridftp to this location where cax running
data_mover.CopyPush(use_api=use_api), # Upload data through e.g. scp or gridftp to this location where cax running
#tsm_mover.AddTSMChecksum(), # Add forgotten Checksum for runDB for TSM client.
checksum.CompareChecksums(), # See if local data corrupted
checksum.AddChecksum(), # Add checksum for data here so can know if corruption (useful for knowing when many good copies!)
@@ -109,7 +115,7 @@ def main():

filesystem.SetPermission(), # Set any permissions (primarily for Tegner) for new data to make sure analysts can access
clear.BufferPurger(), # Clear old data at some locations as specified in cax.json
process.ProcessBatchQueue(), # Process the data with pax
process.ProcessBatchQueue(use_api=use_api), # Process the data with pax
process_hax.ProcessBatchQueueHax() # Process the data with hax
]

68 changes: 54 additions & 14 deletions cax/task.py
Original file line number Diff line number Diff line change
@@ -7,14 +7,19 @@

from cax import config
from cax.dag_prescript import clear_errors
from cax.api import api

class Task():
def __init__(self, query = {}):
def __init__(self, query = {}, use_api = False):
# Grab the Run DB so we can query it
self.collection = config.mongo_collection()

if not use_api:
self.collection = config.mongo_collection()

self.log = logging.getLogger(self.__class__.__name__)
self.run_doc = None
self.untriggered_data = None
self.use_api = use_api

self.query = query

@@ -26,8 +31,7 @@ def go(self, specify_run = None):
if specify_run is not None:
if isinstance(specify_run,int):
self.query['number'] = specify_run
#if 'data' in self.query:
#clear_errors(specify_run, self.query["data"]["$not"]["$elemMatch"]['pax_version'])

elif isinstance(specify_run,str):
self.query['name'] = specify_run

@@ -36,27 +40,28 @@ def go(self, specify_run = None):

# Collect all run document ids. This has to be turned into a list
# to avoid timeouts if a task takes too long.
try:
ids = [doc['_id'] for doc in self.collection.find(self.query,
projection=('_id'),
sort=(('start', -1),))]
except pymongo.errors.CursorNotFound:
self.log.info("Curson not found exception. Skipping")

ids = self.collect_ids()
if ids is None:
self.log.info("Can't get run ids for some reason. Skipping")
return


if len(ids) == 0:
self.log.info("Query matches no entry. Skipping.")
return

# Iterate over each run
for id in ids:
# Make sure up to date
try:
self.run_doc = self.collection.find_one({'_id': id})
except pymongo.errors.AutoReconnect:
self.log.error("pymongo.errors.AutoReconnect, skipping...")
self.run_doc = self.get_rundoc(id)

if self.run_doc is None:
self.log.info("Problems getting rundoc for id %s. Skipping" % id)
continue

if 'data' not in self.run_doc:
self.log.info('Data not in run_doc')
continue

# Operate on only user-specified datasets
@@ -114,3 +119,38 @@ def shutdown(self):
"""Runs at end and can be overloaded by subclasses
"""
pass

def collect_ids(self):
# if not using API interface, do normal pymongo query which is faster
if not self.use_api:
try:
ids = [doc['_id'] for doc in self.collection.find(self.query,
projection=('_id'),
sort=(('start', -1),))]
except pymongo.errors.CursorNotFound:
self.log.info("Cursor not found exception. Skipping")
return

else: # slower but uses API which can be useful
# initialize api instance
API = api()
ids = [doc['_id'] for doc in API.get_all_runs(self.query)]

return ids


def get_rundoc(self, id):
if not self.use_api:
try:
rundoc = self.collection.find_one({'_id': id})
except pymongo.errors.AutoReconnect:
self.log.error("pymongo.errors.AutoReconnect, skipping...")
return

else:
# initialize api
API = api()
# only want the first result, mimics collection.find_one
rundoc = API.get_all_runs({'_id' : id}, _id=id)[0]

return rundoc
83 changes: 47 additions & 36 deletions cax/tasks/data_mover.py
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
from cax import config
from cax.task import Task
from cax import qsub
from cax.api import api

from cax.tasks.tsm_mover import TSMclient
from cax.tasks.rucio_mover import RucioBase, RucioRule
@@ -688,7 +689,6 @@ def copy_handshake(self, datum, destination, method, option_type):
return

if datum['type'] == 'processed':
self.log.info(datum)
base_dir = os.path.join(base_dir, 'pax_%s' % datum['pax_version'])

# Check directory existence on local host for download only
@@ -734,14 +734,19 @@ def copy_handshake(self, datum, destination, method, option_type):
datum_new['location'] = "n/a"

if config.DATABASE_LOG == True:
result = self.collection.update_one({'_id': self.run_doc['_id'],
},
{'$push': {'data': datum_new}})
if not self.use_api:
result = self.collection.update_one({'_id': self.run_doc['_id'],
},
{'$push': {'data': datum_new}})

if result.matched_count == 0:
self.log.error("Race condition! Could not copy because another "
"process seemed to already start.")
return
if result.matched_count == 0:
self.log.error("Race condition! Could not copy because another "
"process seemed to already start.")
return

else:
API = api()
API.add_location(self.run_doc['_id'], datum_new)

self.log.info('Starting '+method)

@@ -772,35 +777,41 @@ def copy_handshake(self, datum, destination, method, option_type):
self.log.debug(method+" done, telling run database")

if config.DATABASE_LOG:
if method == "rucio":
logging.info("following entries are added to the runDB:")
logging.info("Status: %s", self.rucio.get_rucio_info()['status'] )
logging.info("Location: %s", self.rucio.get_rucio_info()['location'] )
logging.info("Checksum: %s", self.rucio.get_rucio_info()['checksum'] )
logging.info("RSE: %s", self.rucio.get_rucio_info()['rse'] )

self.collection.update({'_id' : self.run_doc['_id'],
'data': {
'$elemMatch': datum_new}},
{'$set': {
'data.$.status': self.rucio.get_rucio_info()['status'],
'data.$.location': self.rucio.get_rucio_info()['location'],
'data.$.checksum': self.rucio.get_rucio_info()['checksum'],
'data.$.rse': self.rucio.get_rucio_info()['rse']
}
})

else:
#Fill the data if method is not rucio
if config.DATABASE_LOG:
self.collection.update({'_id' : self.run_doc['_id'],
'data': {
if method == "rucio":
logging.info("following entries are added to the runDB:")
logging.info("Status: %s", self.rucio.get_rucio_info()['status'] )
logging.info("Location: %s", self.rucio.get_rucio_info()['location'] )
logging.info("Checksum: %s", self.rucio.get_rucio_info()['checksum'] )
logging.info("RSE: %s", self.rucio.get_rucio_info()['rse'] )

self.collection.update({'_id' : self.run_doc['_id'],
'data': {
'$elemMatch': datum_new}},
{'$set': {
'data.$.status': status
}
})

{'$set': {
'data.$.status': self.rucio.get_rucio_info()['status'],
'data.$.location': self.rucio.get_rucio_info()['location'],
'data.$.checksum': self.rucio.get_rucio_info()['checksum'],
'data.$.rse': self.rucio.get_rucio_info()['rse']
}
})

#Fill the data if method is not rucio
#if config.DATABASE_LOG:
else:
if not self.use_api:
self.collection.update({'_id' : self.run_doc['_id'],
'data': {
'$elemMatch': datum_new}},
{'$set': {
'data.$.status': status
}
})

else:
API = api()
updatum = datum_new.copy()
updatum['status'] = 'transferred' #status
API.update_location(self.run_doc['_id'], datum_new, updatum)

if method == "rucio":
#Rucio 'side load' to set the transfer rules directly after the file upload
10 changes: 2 additions & 8 deletions cax/tasks/process.py
Original file line number Diff line number Diff line change
@@ -19,7 +19,6 @@

from cax import qsub, config
from cax.task import Task
from cax.api import api
from cax.dag_prescript import clear_errors

def verify():
@@ -187,9 +186,7 @@ def _process(name, in_location, host, pax_version,
class ProcessBatchQueue(Task):
"Create and submit job submission script."

def __init__(self):
self.API = api()

def __init__(self, use_api=False):
self.thishost = config.get_hostname()
self.pax_version = 'v%s' % pax.__version__

@@ -229,7 +226,7 @@ def __init__(self):
query["data"]["$elemMatch"] = {"host" : self.thishost,
"type" : "raw"}

Task.__init__(self, query = query)
Task.__init__(self, query=query, use_api=use_api)

def verify(self):
"""Verify processing worked"""
@@ -406,9 +403,6 @@ def each_run(self):

self.submit(out_location, ncpus, disable_updates, json_file)

#if config.DATABASE_LOG == True:
# self.API.add_location(self.run_doc['_id'], datum)

time.sleep(2)

def local_data_finder(self):