Skip to content
This repository has been archived by the owner on Feb 24, 2022. It is now read-only.

Commit

Permalink
Merge pull request #4532 from dciangot/hotfixes_prod
Browse files Browse the repository at this point in the history
Fixes for Oracle Publisher and TransferWoker
  • Loading branch information
mmascher authored Apr 5, 2017
2 parents 3a7636e + 59128ed commit fefb975
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 12 deletions.
33 changes: 21 additions & 12 deletions src/python/AsyncStageOut/PublisherWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ def __init__(self, user, config):
}
# If we're just testing publication, we skip the DB connection.

self.logger.debug("Getting cache_area")

if hasattr(self.config, "cache_area"):
try:
getCache = re.compile('https?://([^/]*)/.*')
Expand Down Expand Up @@ -136,7 +138,7 @@ def __init__(self, user, config):
defaultDelegation['role'] = ''
else:
defaultDelegation['role'] = self.role
valid, proxy = getProxy(defaultDelegation, self.logger)
valid, proxy = getProxy(defaultDelegation, self.logger)
except Exception as ex:
msg = "Error getting the user proxy"
msg += str(ex)
Expand Down Expand Up @@ -485,9 +487,11 @@ def __call__(self):
result = self.publish(workflow, input_dataset, input_dbs_url, pnn, lfn_ready)
for dataset in result.keys():
published_files = result[dataset].get('published', [])
self.logger.debug("Good files: %s " % published_files)
if published_files:
self.mark_good(workflow, published_files)
failed_files = result[dataset].get('failed', [])
self.logger.debug("Failed files: %s " % failed_files)
if failed_files:
failure_reason = result[dataset].get('failure_reason', "")
force_failure = result[dataset].get('force_failure', False)
Expand Down Expand Up @@ -588,20 +592,23 @@ def mark_failed(self, workflow, files, failure_reason="", force_failure=False):
document = oracleOutputMapping(docbyId, None)[0]
self.logger.debug("Document: %s" % document)

fileDoc = dict()
fileDoc['asoworker'] = 'asodciangot1'
fileDoc['subresource'] = 'updatePublication'
fileDoc['list_of_ids'] = docId
try:
fileDoc = dict()
fileDoc['asoworker'] = self.config.asoworker
fileDoc['subresource'] = 'updatePublication'
fileDoc['list_of_ids'] = docId

if force_failure or document['publish_retry_count'] > self.max_retry:
fileDoc['list_of_publication_state'] = 'FAILED'
else:
fileDoc['list_of_publication_state'] = 'RETRY'
# TODO: implement retry
fileDoc['list_of_retry_value'] = 1
fileDoc['list_of_failure_reason'] = failure_reason
#if force_failure or document['publish_retry_count'] > self.max_retry:
# fileDoc['list_of_publication_state'] = 'FAILED'
#else:
# fileDoc['list_of_publication_state'] = 'RETRY'
# TODO: implement retry
fileDoc['list_of_retry_value'] = 1
fileDoc['list_of_failure_reason'] = failure_reason

self.logger.debug("fileDoc: %s " % fileDoc)

try:
result = self.oracleDB.post(self.config.oracleFileTrans,
data=encodeRequest(fileDoc))
self.logger.debug("updated: %s " % docId)
Expand Down Expand Up @@ -748,6 +755,7 @@ def publish(self, workflow, inputDataset, sourceURL, pnn, lfn_ready):
msg = "DBS publication results: %s" % (dbsResults)
self.logger.debug(wfnamemsg+msg)
for dataset in toPublish.keys():
self.logger.debug('failed: %s' % failed.get(dataset, []))
retdict.update({dataset: {'failed': failed.get(dataset, []),
'failure_reason': failure_reason.get(dataset, ""),
'published': published.get(dataset, [])
Expand Down Expand Up @@ -1344,6 +1352,7 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn):
destApi.insertBulkBlock(blockDump)
block_count += 1
except Exception as ex:
self.logger.error("Error for files: %s" % [f['logical_file_name'] for f in files_to_publish])
failed[dataset].extend([f['logical_file_name'] for f in files_to_publish])
msg = "Error when publishing (%s) " % ", ".join(failed[dataset])
msg += str(ex)
Expand Down
2 changes: 2 additions & 0 deletions src/python/AsyncStageOut/TransferWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def __call__(self):
jobs, jobs_lfn, jobs_pfn, jobs_report = self.files_for_transfer()
except:
self.logger.exception('delegation failed')
return
self.logger.debug("Processing files for %s " % self.user_proxy)
if jobs:
jobReport = self.command(jobs, jobs_lfn, jobs_pfn, jobs_report)
Expand All @@ -202,6 +203,7 @@ def source_destinations_by_user(self):
fileDoc['subresource'] = 'acquiredTransfers'
fileDoc['grouping'] = 1
fileDoc['username'] = self.user
group = self.group
if self.group == '':
group = None
if self.role == '':
Expand Down

0 comments on commit fefb975

Please sign in to comment.