diff --git a/src/python/AsyncStageOut/PublisherWorker.py b/src/python/AsyncStageOut/PublisherWorker.py index 7704bb7..48ad223 100644 --- a/src/python/AsyncStageOut/PublisherWorker.py +++ b/src/python/AsyncStageOut/PublisherWorker.py @@ -95,6 +95,8 @@ def __init__(self, user, config): } # If we're just testing publication, we skip the DB connection. + self.logger.debug("Getting cache_area") + if hasattr(self.config, "cache_area"): try: getCache = re.compile('https?://([^/]*)/.*') @@ -136,7 +138,7 @@ def __init__(self, user, config): defaultDelegation['role'] = '' else: defaultDelegation['role'] = self.role - valid, proxy = getProxy(defaultDelegation, self.logger) + valid, proxy = getProxy(defaultDelegation, self.logger) except Exception as ex: msg = "Error getting the user proxy" msg += str(ex) @@ -485,9 +487,11 @@ def __call__(self): result = self.publish(workflow, input_dataset, input_dbs_url, pnn, lfn_ready) for dataset in result.keys(): published_files = result[dataset].get('published', []) + self.logger.debug("Good files: %s " % published_files) if published_files: self.mark_good(workflow, published_files) failed_files = result[dataset].get('failed', []) + self.logger.debug("Failed files: %s " % failed_files) if failed_files: failure_reason = result[dataset].get('failure_reason', "") force_failure = result[dataset].get('force_failure', False) @@ -588,20 +592,23 @@ def mark_failed(self, workflow, files, failure_reason="", force_failure=False): document = oracleOutputMapping(docbyId, None)[0] self.logger.debug("Document: %s" % document) - fileDoc = dict() - fileDoc['asoworker'] = 'asodciangot1' - fileDoc['subresource'] = 'updatePublication' - fileDoc['list_of_ids'] = docId + try: + fileDoc = dict() + fileDoc['asoworker'] = self.config.asoworker + fileDoc['subresource'] = 'updatePublication' + fileDoc['list_of_ids'] = docId - if force_failure or document['publish_retry_count'] > self.max_retry: fileDoc['list_of_publication_state'] = 'FAILED' - else: - fileDoc['list_of_publication_state'] = 'RETRY' - # TODO: implement retry - fileDoc['list_of_retry_value'] = 1 - fileDoc['list_of_failure_reason'] = failure_reason + #if force_failure or document['publish_retry_count'] > self.max_retry: + # fileDoc['list_of_publication_state'] = 'FAILED' + #else: + # fileDoc['list_of_publication_state'] = 'RETRY' + # TODO: implement retry + fileDoc['list_of_retry_value'] = 1 + fileDoc['list_of_failure_reason'] = failure_reason + + self.logger.debug("fileDoc: %s " % fileDoc) - try: result = self.oracleDB.post(self.config.oracleFileTrans, data=encodeRequest(fileDoc)) self.logger.debug("updated: %s " % docId) @@ -748,6 +755,7 @@ def publish(self, workflow, inputDataset, sourceURL, pnn, lfn_ready): msg = "DBS publication results: %s" % (dbsResults) self.logger.debug(wfnamemsg+msg) for dataset in toPublish.keys(): + self.logger.debug('failed: %s' % failed.get(dataset, [])) retdict.update({dataset: {'failed': failed.get(dataset, []), 'failure_reason': failure_reason.get(dataset, ""), 'published': published.get(dataset, []) @@ -1344,6 +1352,7 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn): destApi.insertBulkBlock(blockDump) block_count += 1 except Exception as ex: + self.logger.error("Error for files: %s" % [f['logical_file_name'] for f in files_to_publish]) failed[dataset].extend([f['logical_file_name'] for f in files_to_publish]) msg = "Error when publishing (%s) " % ", ".join(failed[dataset]) msg += str(ex) diff --git a/src/python/AsyncStageOut/TransferWorker.py b/src/python/AsyncStageOut/TransferWorker.py index 6b6866e..6c05c9f 100644 --- a/src/python/AsyncStageOut/TransferWorker.py +++ b/src/python/AsyncStageOut/TransferWorker.py @@ -180,6 +180,7 @@ def __call__(self): jobs, jobs_lfn, jobs_pfn, jobs_report = self.files_for_transfer() except: self.logger.exception('delegation failed') + return self.logger.debug("Processing files for %s " % self.user_proxy) if jobs: jobReport = self.command(jobs, jobs_lfn, jobs_pfn, jobs_report) @@ -202,6 +203,7 @@ def source_destinations_by_user(self): fileDoc['subresource'] = 'acquiredTransfers' fileDoc['grouping'] = 1 fileDoc['username'] = self.user + group = self.group if self.group == '': group = None if self.role == '':