Skip to content
This repository has been archived by the owner on Feb 24, 2022. It is now read-only.

Commit

Permalink
DBS url from taskDB
Browse files Browse the repository at this point in the history
  • Loading branch information
dciangot committed Mar 31, 2017
1 parent 59128ed commit cf8cfb5
Showing 1 changed file with 33 additions and 3 deletions.
36 changes: 33 additions & 3 deletions src/python/AsyncStageOut/PublisherWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ def __call__(self):
enough_lfn_ready_in_all_datasets = not False in enough_lfn_ready
# If for any of the datasets there are less than max_files_per_block to publish,
# check for other conditions to decide whether to publish that dataset or not.
data = {'workflow': workflow}
if enough_lfn_ready_in_all_datasets:
# TODO: Check how often we are on this situation. I suspect it is not so often,
# in which case I would remove the 'if enough_lfn_ready_in_all_datasets' and
Expand All @@ -356,7 +357,6 @@ def __call__(self):
msg = "Retrieving status from %s" % (url)
self.logger.info(wfnamemsg+msg)
buf = cStringIO.StringIO()
data = {'workflow': workflow}
header = {"Content-Type ":"application/json"}
try:
_, res_ = self.connection.request(url,
Expand All @@ -368,7 +368,8 @@ def __call__(self):
)# , verbose=True) # for debug
except Exception as ex:
if self.config.isOracle:
self.logger.exception('Error retrieving status from cache.')
self.logger.exception('Error retrieving status from cache.')
return
else:
msg = "Error retrieving status from cache. Fall back to user cache area"
msg += str(ex)
Expand Down Expand Up @@ -397,7 +398,6 @@ def __call__(self):
msg += str(ex)
msg += str(traceback.format_exc())
self.logger.error(wfnamemsg+msg)

msg = "Status retrieved from cache. Loading task status."
self.logger.info(wfnamemsg+msg)
try:
Expand Down Expand Up @@ -803,9 +803,11 @@ def clean(self, lfn_ready_list, publDescFiles):
Discard ready files that have no filematadata (and vice versa).
"""
publDescFiles_filtered = {}
self.logger.debug("lfn list: %s " % (lfn_ready_list))
for dataset, outfiles_metadata in publDescFiles.iteritems():
for outfile_metadata in outfiles_metadata:
dest_lfn = outfile_metadata['lfn']
self.logger.debug("Checking: %s " % (dest_lfn))
if dest_lfn in lfn_ready_list:
publDescFiles_filtered.setdefault(dataset, []).append(outfile_metadata)
return publDescFiles_filtered
Expand Down Expand Up @@ -1108,6 +1110,34 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn):
sourceApi = dbsClient.DbsApi(url=sourceURL, proxy=proxy)
self.logger.debug(wfnamemsg+"Global API URL: %s" % globalURL)
globalApi = dbsClient.DbsApi(url=globalURL, proxy=proxy)

# TODO: take it from taskDB tm_publish_dbs_url for that task
#
fileDoc = dict()
fileDoc['workflow'] = workflow
fileDoc['subresource'] = 'getpublishurl'

try:
result = self.oracleDB.post(self.config.oracleFileTrans.replace('filetransfers','task'),
data=encodeRequest(fileDoc))
self.logger.debug("Got DBS API URL: %s " % result[0]["result"][0][0])
#[{"result": [["https://cmsweb.cern.ch/dbs/prod/phys03/DBSWriter"]]}, 200, "OK"]
self.publish_dbs_url = result[0]["result"][0][0]
except Exception:
self.logger.exception("Failed to retrieve DBS API URL for DB, fallback to central config: %s" % self.publish_dbs_url)

WRITE_PATH = "/DBSWriter"
MIGRATE_PATH = "/DBSMigrate"
READ_PATH = "/DBSReader"

if self.publish_dbs_url.endswith(WRITE_PATH):
self.publish_read_url = self.publish_dbs_url[:-len(WRITE_PATH)] + READ_PATH
self.publish_migrate_url = self.publish_dbs_url[:-len(WRITE_PATH)] + MIGRATE_PATH
else:
self.publish_migrate_url = self.publish_dbs_url + MIGRATE_PATH
self.publish_read_url = self.publish_dbs_url + READ_PATH
self.publish_dbs_url += WRITE_PATH

self.logger.debug(wfnamemsg+"Destination API URL: %s" % self.publish_dbs_url)
destApi = dbsClient.DbsApi(url=self.publish_dbs_url, proxy=proxy)
self.logger.debug(wfnamemsg+"Destination read API URL: %s" % self.publish_read_url)
Expand Down

0 comments on commit cf8cfb5

Please sign in to comment.