diff --git a/src/python/AsyncStageOut/PublisherWorker.py b/src/python/AsyncStageOut/PublisherWorker.py index 48ad223..e143b8f 100644 --- a/src/python/AsyncStageOut/PublisherWorker.py +++ b/src/python/AsyncStageOut/PublisherWorker.py @@ -338,6 +338,7 @@ def __call__(self): enough_lfn_ready_in_all_datasets = not False in enough_lfn_ready # If for any of the datasets there are less than max_files_per_block to publish, # check for other conditions to decide whether to publish that dataset or not. + data = {'workflow': workflow} if enough_lfn_ready_in_all_datasets: # TODO: Check how often we are on this situation. I suspect it is not so often, # in which case I would remove the 'if enough_lfn_ready_in_all_datasets' and @@ -356,7 +357,6 @@ def __call__(self): msg = "Retrieving status from %s" % (url) self.logger.info(wfnamemsg+msg) buf = cStringIO.StringIO() - data = {'workflow': workflow} header = {"Content-Type ":"application/json"} try: _, res_ = self.connection.request(url, @@ -368,7 +368,8 @@ def __call__(self): )# , verbose=True) # for debug except Exception as ex: if self.config.isOracle: - self.logger.exception('Error retrieving status from cache.') + self.logger.exception('Error retrieving status from cache.') + return else: msg = "Error retrieving status from cache. Fall back to user cache area" msg += str(ex) @@ -397,7 +398,6 @@ def __call__(self): msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(wfnamemsg+msg) - msg = "Status retrieved from cache. Loading task status." self.logger.info(wfnamemsg+msg) try: @@ -803,9 +803,11 @@ def clean(self, lfn_ready_list, publDescFiles): Discard ready files that have no filematadata (and vice versa). """ publDescFiles_filtered = {} + self.logger.debug("lfn list: %s " % (lfn_ready_list)) for dataset, outfiles_metadata in publDescFiles.iteritems(): for outfile_metadata in outfiles_metadata: dest_lfn = outfile_metadata['lfn'] + self.logger.debug("Checking: %s " % (dest_lfn)) if dest_lfn in lfn_ready_list: publDescFiles_filtered.setdefault(dataset, []).append(outfile_metadata) return publDescFiles_filtered @@ -1108,6 +1110,34 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn): sourceApi = dbsClient.DbsApi(url=sourceURL, proxy=proxy) self.logger.debug(wfnamemsg+"Global API URL: %s" % globalURL) globalApi = dbsClient.DbsApi(url=globalURL, proxy=proxy) + + # TODO: take it from taskDB tm_publish_dbs_url for that task + # + fileDoc = dict() + fileDoc['workflow'] = workflow + fileDoc['subresource'] = 'getpublishurl' + + try: + result = self.oracleDB.post(self.config.oracleFileTrans.replace('filetransfers','task'), + data=encodeRequest(fileDoc)) + self.logger.debug("Got DBS API URL: %s " % result[0]["result"][0][0]) + #[{"result": [["https://cmsweb.cern.ch/dbs/prod/phys03/DBSWriter"]]}, 200, "OK"] + self.publish_dbs_url = result[0]["result"][0][0] + except Exception: + self.logger.exception("Failed to retrieve DBS API URL for DB, fallback to central config: %s" % self.publish_dbs_url) + + WRITE_PATH = "/DBSWriter" + MIGRATE_PATH = "/DBSMigrate" + READ_PATH = "/DBSReader" + + if self.publish_dbs_url.endswith(WRITE_PATH): + self.publish_read_url = self.publish_dbs_url[:-len(WRITE_PATH)] + READ_PATH + self.publish_migrate_url = self.publish_dbs_url[:-len(WRITE_PATH)] + MIGRATE_PATH + else: + self.publish_migrate_url = self.publish_dbs_url + MIGRATE_PATH + self.publish_read_url = self.publish_dbs_url + READ_PATH + self.publish_dbs_url += WRITE_PATH + self.logger.debug(wfnamemsg+"Destination API URL: %s" % self.publish_dbs_url) destApi = dbsClient.DbsApi(url=self.publish_dbs_url, proxy=proxy) self.logger.debug(wfnamemsg+"Destination read API URL: %s" % self.publish_read_url)