From 2e3d412f111f5112734186960e6685c760e33772 Mon Sep 17 00:00:00 2001 From: Andres Tanasijczuk Date: Fri, 6 Nov 2015 21:24:38 +0100 Subject: [PATCH 1/2] Improve publication failure messages in case of problem with migrations. --- src/python/AsyncStageOut/PublisherWorker.py | 89 +++++++++------------ 1 file changed, 40 insertions(+), 49 deletions(-) diff --git a/src/python/AsyncStageOut/PublisherWorker.py b/src/python/AsyncStageOut/PublisherWorker.py index c3cdb4e..95cad3c 100644 --- a/src/python/AsyncStageOut/PublisherWorker.py +++ b/src/python/AsyncStageOut/PublisherWorker.py @@ -639,26 +639,23 @@ def format_file_3(self, file): return nf - def migrateByBlockDBS3(self, workflow, migrateApi, destReadApi, sourceApi, inputDataset, inputBlocks = None): + def migrateByBlockDBS3(self, workflow, migrateApi, destReadApi, sourceApi, dataset, blocks = None): """ Submit one migration request for each block that needs to be migrated. - If inputBlocks is missing, migrate the full dataset. - :return: - the output of destReadApi.listDatasets(dataset = inputDataset) - if all migrations have finished ok, - - an empty list otherwise. + If blocks argument is not specified, migrate the whole dataset. """ wfnamemsg = "%s: " % (workflow) - if inputBlocks: - blocksToMigrate = set(inputBlocks) + if blocks: + blocksToMigrate = set(blocks) else: ## This is for the case to migrate the whole dataset, which we don't do - ## at this point Feb/2015 (we always pass inputBlocks). + ## at this point Feb/2015 (we always pass blocks). ## Make a set with the blocks that need to be migrated. - blocksInDestDBS = set([block['block_name'] for block in destReadApi.listBlocks(dataset = inputDataset)]) - blocksInSourceDBS = set([block['block_name'] for block in sourceApi.listBlocks(dataset = inputDataset)]) + blocksInDestDBS = set([block['block_name'] for block in destReadApi.listBlocks(dataset = dataset)]) + blocksInSourceDBS = set([block['block_name'] for block in sourceApi.listBlocks(dataset = dataset)]) blocksToMigrate = blocksInSourceDBS - blocksInDestDBS msg = "Dataset %s in destination DBS with %d blocks; %d blocks in source DBS." - msg = msg % (inputDataset, len(blocksInDestDBS), len(blocksInSourceDBS)) + msg = msg % (dataset, len(blocksInDestDBS), len(blocksInSourceDBS)) self.logger.info(wfnamemsg+msg) numBlocksToMigrate = len(blocksToMigrate) if numBlocksToMigrate == 0: @@ -771,14 +768,13 @@ def migrateByBlockDBS3(self, workflow, migrateApi, destReadApi, sourceApi, input ## Stop waiting if there are no more migrations in progress. if numMigrationsInProgress == 0: break - ## If after the 300 seconds there are still some migrations in progress, - ## return an empty list (the caller function should interpret this as - ## "migration has failed or is not complete"). + ## If after the 300 seconds there are still some migrations in progress, return + ## with status 1. if numMigrationsInProgress > 0: - msg = "Migration of %s has taken too long - will delay the publication." % (inputDataset) + msg = "Migration of %s is taking too long - will delay the publication." % (dataset) self.logger.info(wfnamemsg+msg) - return [] - msg = "Migration of %s has finished." % (inputDataset) + return (1, "Migration of %s is taking too long." % (dataset)) + msg = "Migration of %s has finished." % (dataset) self.logger.info(wfnamemsg+msg) msg = "Migration status summary (from %d input blocks to migrate):" % (numBlocksToMigrate) msg += " at destination = %d," % (numBlocksAtDestination) @@ -787,22 +783,29 @@ def migrateByBlockDBS3(self, workflow, migrateApi, destReadApi, sourceApi, input msg += " submission failed = %d," % (numFailedSubmissions) msg += " queued with unknown id = %d." % (numQueuedUnkwonIds) self.logger.info(wfnamemsg+msg) - ## If there were failed migrations, return an empty list (the caller - ## function should interpret this as "migration has failed or is not - ## complete"). + ## If there were failed migrations, return with status 2. if numFailedMigrations > 0 or numFailedSubmissions > 0: msg = "Some blocks failed to be migrated." self.logger.info(wfnamemsg+msg) - return [] + return (2, "Migration of %s failed." % (dataset)) + ## If there were no failed migrations, but we could not retrieve the request id + ## from some already queued requests, return with status 3. if numQueuedUnkwonIds > 0: - msg = "Some block migrations were already queued, but failed to retrieve the request id." + msg = "Some block migrations were already queued, but failed to retrieve their request id." self.logger.info(wfnamemsg+msg) - return [] - if (numBlocksAtDestination + numSuccessfulMigrations) == numBlocksToMigrate: - msg = "Migration was successful." + return (3, "Migration of %s in unknown status." % (dataset)) + if (numBlocksAtDestination + numSuccessfulMigrations) != numBlocksToMigrate: + msg = "Something unexpected has happened." + msg += " The numbers in the migration summary are not consistent." + msg += " Make sure there is no bug in the code." self.logger.info(wfnamemsg+msg) - existingDatasets = destReadApi.listDatasets(dataset = inputDataset, detail = True, dataset_access_type = '*') - return existingDatasets + return (4, "Migration of %s in some inconsistent status." % (dataset)) + msg = "Migration completed." + self.logger.info(wfnamemsg+msg) + migratedDataset = destReadApi.listDatasets(dataset = dataset, detail = True, dataset_access_type = '*') + if not migratedDataset or migratedDataset[0].get('dataset', None) != dataset: + return (4, "Migration of %s in some inconsistent status." % (dataset)) + return (0, "") def requestBlockMigration(self, workflow, migrateApi, sourceApi, block): @@ -1115,35 +1118,23 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn): if localParentBlocks: msg = "List of parent blocks that need to be migrated from %s:\n%s" % (sourceApi.url, localParentBlocks) self.logger.info(wfnamemsg+msg) - existingDatasets = self.migrateByBlockDBS3(workflow, migrateApi, destReadApi, sourceApi, inputDataset, localParentBlocks) - if not existingDatasets: - msg = "Failed to migrate %s from %s to %s; not publishing any files." % (inputDataset, sourceApi.url, destReadApi.url) - self.logger.info(wfnamemsg+msg) - failed[dataset].extend([f['logical_file_name'] for f in dbsFiles]) - failure_reason[dataset] = msg - continue - if not existingDatasets[0]['dataset'] == inputDataset: - msg = "ERROR: Inconsistent state: %s migrated, but listDatasets didn't return any information." % (inputDataset) - self.logger.info(wfnamemsg+msg) + statusCode, failureMsg = self.migrateByBlockDBS3(workflow, migrateApi, destReadApi, sourceApi, inputDataset, localParentBlocks) + if statusCode: + failureMsg += " Not publishing any files." + self.logger.info(wfnamemsg+failureMsg) failed[dataset].extend([f['logical_file_name'] for f in dbsFiles]) - failure_reason[dataset] = msg + failure_reason[dataset] = failureMsg continue ## Then migrate the parent blocks that are in the global DBS instance. if globalParentBlocks: msg = "List of parent blocks that need to be migrated from %s:\n%s" % (globalApi.url, globalParentBlocks) self.logger.info(wfnamemsg+msg) - existingDatasets = self.migrateByBlockDBS3(workflow, migrateApi, destReadApi, globalApi, inputDataset, globalParentBlocks) - if not existingDatasets: - msg = "Failed to migrate %s from %s to %s; not publishing any files." % (inputDataset, globalApi.url, destReadApi.url) - self.logger.info(wfnamemsg+msg) - failed[dataset].extend([f['logical_file_name'] for f in dbsFiles]) - failure_reason[dataset] = msg - continue - if not existingDatasets[0]['dataset'] == inputDataset: - msg = "ERROR: Inconsistent state: %s migrated, but listDatasets didn't return any information" % (inputDataset) - self.logger.info(wfnamemsg+msg) + statusCode, failureMsg = self.migrateByBlockDBS3(workflow, migrateApi, destReadApi, globalApi, inputDataset, globalParentBlocks) + if statusCode: + failureMsg += " Not publishing any files." + self.logger.info(wfnamemsg+failureMsg) failed[dataset].extend([f['logical_file_name'] for f in dbsFiles]) - failure_reason[dataset] = msg + failure_reason[dataset] = failureMsg continue ## Publish the files in blocks. The blocks must have exactly max_files_per_block ## files, unless there are less than max_files_per_block files to publish to From 24a4fb246c52b2ea5b7e6c4eb6fa4f3693db36ff Mon Sep 17 00:00:00 2001 From: Hassen Riahi Date: Thu, 26 Nov 2015 10:35:30 +0100 Subject: [PATCH 2/2] Include pylint changes. Fix #4459 --- src/python/AsyncStageOut/PublisherWorker.py | 68 ++++++++------------- 1 file changed, 24 insertions(+), 44 deletions(-) diff --git a/src/python/AsyncStageOut/PublisherWorker.py b/src/python/AsyncStageOut/PublisherWorker.py index 95cad3c..9447d79 100644 --- a/src/python/AsyncStageOut/PublisherWorker.py +++ b/src/python/AsyncStageOut/PublisherWorker.py @@ -14,8 +14,7 @@ import json import time import uuid -import types -import pprint +#import pprint import urllib import tarfile import logging @@ -63,7 +62,7 @@ def __init__(self, user, config): try: self.userDN = getDNFromUserName(self.user, self.logger) except Exception as ex: - msg = "Error retrieving the user DN" + msg = "Error retrieving the user DN" msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(msg) @@ -93,7 +92,7 @@ def __init__(self, user, config): try: self.user_cache_area = self.db.loadView('DBSPublisher', 'cache_area', query)['rows'] except Exception as ex: - msg = "Error getting user cache_area" + msg = "Error getting user cache_area" msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(msg) @@ -124,10 +123,6 @@ def __init__(self, user, config): # Use the operator's proxy when the user proxy in invalid. # This will be moved soon self.logger.error('Did not get valid proxy. Setting proxy to ops proxy') - info = {'server_key': self.config.opsProxy, 'server_cert': self.config.opsProxy, 'logger': self.logger} - self.logger.info("Ops proxy info: %s" % str(info)) - opsProxy = Proxy({'server_key': self.config.opsProxy, 'server_cert': self.config.opsProxy, 'logger': self.logger}) - self.userDN = opsProxy.getSubject() self.userProxy = self.config.opsProxy #self.cache_area = self.config.cache_area self.phedexApi = PhEDEx(responseType='json') @@ -191,7 +186,7 @@ def __call__(self): query = {'reduce': False, 'key': user_wf['key']}#'stale': 'ok'} try: active_files = self.db.loadView('DBSPublisher', 'publish', query)['rows'] - except Exception, e: + except Exception as e: msg = "A problem occured to retrieve the list of active files for %s: %s" % (self.user, e) self.logger.error(wfnamemsg+msg) msg = "Publications will be retried next time." @@ -297,7 +292,7 @@ def __call__(self): data = {'workflow': workflow} header = {"Content-Type ":"application/json"} try: - response, res_ = self.connection.request(url, data, header, doseq=True, ckey=self.userProxy, cert=self.userProxy)#, verbose=True)# for debug + _, res_ = self.connection.request(url, data, header, doseq=True, ckey=self.userProxy, cert=self.userProxy)#, verbose=True)# for debug except Exception as ex: msg = "Error retrieving status from cache." msg += str(ex) @@ -407,14 +402,14 @@ def mark_good(self, workflow, files=[]): updateUri += "?" + urllib.urlencode(data) self.logger.info(wfnamemsg+"URI: %s" % updateUri) self.db.makeRequest(uri = updateUri, type = "PUT", decode = False) - except Exception, ex: + except Exception as ex: msg = "Error updating document in Couch." msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(wfnamemsg+msg) try: self.db.commit() - except Exception, ex: + except Exception as ex: msg = "Error committing documents in Couch." msg += str(ex) msg += str(traceback.format_exc()) @@ -438,7 +433,7 @@ def mark_failed(self, workflow, files=[], failure_reason="", force_failure=False # Load document to get the retry_count try: document = self.db.document(docId) - except Exception, ex: + except Exception as ex: msg = "Error loading document from Couch." msg += str(ex) msg += str(traceback.format_exc()) @@ -458,14 +453,14 @@ def mark_failed(self, workflow, files=[], failure_reason="", force_failure=False updateUri += "?" + urllib.urlencode(data) self.logger.info(wfnamemsg+"URI: %s" % updateUri) self.db.makeRequest(uri = updateUri, type = "PUT", decode = False) - except Exception, ex: + except Exception as ex: msg = "Error updating document in Couch." msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(wfnamemsg+msg) try: self.db.commit() - except Exception, ex: + except Exception as ex: msg = "Error committing documents in Couch." msg += str(ex) msg += str(traceback.format_exc()) @@ -487,7 +482,7 @@ def publish(self, workflow, inputDataset, sourceURL, pnn, lfn_ready): ## so to not retrieve the filemetadata unnecesarily. if not False in map(lambda x: len(x) < self.max_files_per_block, lfn_ready.values()) and not self.force_publication: msg = "Skipping publication as there are not enough ready files in any of the datasets (and publication was not forced)." - self.logger.info(wfnamemsg+msg) + self.logger.info(wfnamemsg+msg) return retdict ## Get the filemetada for this workflow. msg = "Retrieving publication description files." @@ -496,7 +491,7 @@ def publish(self, workflow, inputDataset, sourceURL, pnn, lfn_ready): for v in lfn_ready.values(): lfn_ready_list.extend(v) try: - publDescFiles_list = self.getPublDescFiles(workflow, self.user) + publDescFiles_list = self.getPublDescFiles(workflow) except (tarfile.ReadError, RuntimeError): msg = "Error retrieving publication description files." self.logger.error(wfnamemsg+msg) @@ -557,26 +552,12 @@ def publish(self, workflow, inputDataset, sourceURL, pnn, lfn_ready): return retdict - def getPublDescFiles(self, workflow, userhn): + def getPublDescFiles(self, workflow): """ Download and read the files describing what needs to be published """ wfnamemsg = "%s: " % (workflow) - def decodeAsString(a): - """ - DBS is stupid and doesn't understand that unicode is a string: (if type(obj) == type('')) - So best to convert as much of the decoded JSON to str as possible. Some is left over and handled by - PoorMansBufferFile - """ - newDict = {} - for key, value in a.iteritems(): - if type(key) == types.UnicodeType: - key = str(key) - if type(value) == types.UnicodeType: - value = str(value) - newDict.update({key : value}) - return newDict buf = cStringIO.StringIO() res = [] # TODO: input sanitization @@ -586,8 +567,8 @@ def decodeAsString(a): msg = "Retrieving data from %s" % (url) self.logger.info(wfnamemsg+msg) try: - response, res_ = self.connection.request(url, data, header, doseq=True, ckey=self.userProxy, cert=self.userProxy)#, verbose=True)# for debug - except Exception, ex: + _, res_ = self.connection.request(url, data, header, doseq=True, ckey=self.userProxy, cert=self.userProxy)#, verbose=True)# for debug + except Exception as ex: msg = "Error retrieving data." msg += str(ex) msg += str(traceback.format_exc()) @@ -598,7 +579,7 @@ def decodeAsString(a): try: buf.close() res = json.loads(res_) - except Exception, ex: + except Exception as ex: msg = "Error loading results. Trying next time!" msg += str(ex) msg += str(traceback.format_exc()) @@ -724,7 +705,7 @@ def migrateByBlockDBS3(self, workflow, migrateApi, destReadApi, sourceApi, datas numTimes = 10 msg = "Will monitor their status for up to %d seconds." % (waitTime * numTimes) self.logger.info(wfnamemsg+msg) - for i in range(numTimes): + for _ in range(numTimes): msg = "%d block migrations in progress." % (numMigrationsInProgress) msg += " Will check migrations status in %d seconds." % (waitTime) self.logger.info(wfnamemsg+msg) @@ -737,7 +718,7 @@ def migrateByBlockDBS3(self, workflow, migrateApi, destReadApi, sourceApi, datas status = migrateApi.statusMigration(migration_rqst_id = reqid) state = status[0].get('migration_status') retry = status[0].get('retry_count') - except Exception, ex: + except Exception as ex: msg = "Could not get status for migration id %d:\n%s" % (reqid, ex.msg) self.logger.error(wfnamemsg+msg) else: @@ -822,7 +803,7 @@ def requestBlockMigration(self, workflow, migrateApi, sourceApi, block): data = {'migration_url': sourceURL, 'migration_input': block} try: result = migrateApi.submitMigration(data) - except HTTPError, he: + except HTTPError as he: if "is already at destination" in he.msg: msg = "Block is already at destination." self.logger.info(wfnamemsg+msg) @@ -967,7 +948,6 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn): appName = 'cmsRun' appVer = files[0]["swversion"] - appFam = 'output' pset_hash = files[0]['publishname'].split("-")[-1] gtag = str(files[0]['globaltag']) if gtag == "None": @@ -993,7 +973,7 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn): msg += " (%d valid, %d invalid)." % (len(existingFilesValid), len(existingFiles) - len(existingFilesValid)) self.logger.info(wfnamemsg+msg) results[dataset]['existingFiles'] = len(existingFiles) - except Exception, ex: + except Exception as ex: msg = "Error when listing files in DBS: %s" % (str(ex)) msg += "\n%s" % (str(traceback.format_exc())) self.logger.error(wfnamemsg+msg) @@ -1065,11 +1045,11 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn): parentFiles.add(parentFile) ## Is this parent file already in the destination DBS instance? ## (If yes, then we don't have to migrate this block.) - blocksDict = destReadApi.listBlocks(logical_file_name = parentFile) + blocksDict = destReadApi.listBlocks(logical_file_name=parentFile) if not blocksDict: ## No, this parent file is not in the destination DBS instance. ## Maybe it is in the same DBS instance as the input dataset? - blocksDict = sourceApi.listBlocks(logical_file_name = parentFile) + blocksDict = sourceApi.listBlocks(logical_file_name=parentFile) if blocksDict: ## Yes, this parent file is in the same DBS instance as the input dataset. ## Add the corresponding block to the set of blocks from the source DBS @@ -1078,7 +1058,7 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn): else: ## No, this parent file is not in the same DBS instance as input dataset. ## Maybe it is in global DBS instance? - blocksDict = globalApi.listBlocks(logical_file_name = parentFile) + blocksDict = globalApi.listBlocks(logical_file_name=parentFile) if blocksDict: ## Yes, this parent file is in global DBS instance. ## Add the corresponding block to the set of blocks from global DBS @@ -1155,7 +1135,7 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn): #self.logger.debug(wfnamemsg+"Block to insert: %s\n" % pprint.pformat(blockDump)) destApi.insertBulkBlock(blockDump) block_count += 1 - except Exception, ex: + except Exception as ex: failed[dataset].extend([f['logical_file_name'] for f in files_to_publish]) msg = "Error when publishing (%s) " % ", ".join(failed[dataset]) msg += str(ex)