diff --git a/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py b/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py index 6ff8c9bc6f..4a3607d7e5 100644 --- a/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py +++ b/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py @@ -29,6 +29,7 @@ gfal2 = None from pymongo import IndexModel +from pymongo.errors import NotMasterError # WMCore modules from WMCore.MicroService.DataStructs.DefaultStructs import UNMERGED_REPORT @@ -109,6 +110,7 @@ def __init__(self, msConfig, logger=None): self.msConfig.setdefault("mongoDBUrl", 'mongodb://localhost') self.msConfig.setdefault("mongoDBPort", 27017) self.msConfig.setdefault("mongoDB", 'msUnmergedDB') + self.msConfig.setdefault("mongoDBRetryCount", 3) self.msConfig.setdefault("mongoDBReplicaset", None) msUnmergedIndex = IndexModel('name', unique=True) @@ -164,6 +166,9 @@ def __init__(self, msConfig, logger=None): self.regStoreUnmergedLfn = re.compile("^/store/unmerged/.*$") self.regStoreUnmergedPfn = re.compile("^.+/store/unmerged/.*$") + # log msConfig + self.logger.info("msConfig: %s", pformat(self.msConfig)) + # @profile def execute(self): """ @@ -415,7 +420,7 @@ def _checkClean(self, rse): def consRecordAge(self, rse): """ - A method to heck the duration of the consistency record for the RSE + A method to check the duration of the consistency record for the RSE :param rse: The RSE to be checked :return: rse or raises MSUnmergedPlineExit """ @@ -431,15 +436,38 @@ def consRecordAge(self, rse): isConsNewer = self.rseConsStats[rseName]['end_time'] > self.rseTimestamps[rseName]['prevStartTime'] if not isConsNewer: msg = "RSE: %s With old consistency record in Rucio Consistency Monitor. " % rseName - msg += "Skipping it in the current run." - self.logger.info(msg) - raise MSUnmergedPlineExit(msg) + if 'isClean' in rse and rse['isClean']: + msg += "And the RSE has been cleaned during the last Rucio Consistency Monitor polling cycle." + msg += "Skipping it in the current run." + self.logger.info(msg) + self.updateRSETimestamps(rse, start=False, end=True) + raise MSUnmergedPlineExit(msg) + else: + msg += "But the RSE has NOT been cleaned during the last Rucio Consistency Monitor polling cycle." + msg += "Retrying cleanup in the current run." + self.logger.info(msg) if not isConsDone: msg = "RSE: %s In non-final state in Rucio Consistency Monitor. " % rseName msg += "Skipping it in the current run." self.logger.warning(msg) + self.updateRSETimestamps(rse, start=False, end=True) raise MSUnmergedPlineExit(msg) + if isConsNewer and isConsDone: + # NOTE: If we've got to this point then we have a brand new record for + # the RSE in RucioConMOn and we are then about to start a new RSE cleanup + # so we will need to wipe out all but the timestamps from both + # the current object and the MongoDB record for the object. + msg = "RSE: %s With new consistency record in Rucio Consistency Monitor. " % rseName + msg += "Resetting RSE and starting a fresh cleanup process in the current run." + self.logger.info(msg) + try: + rse.resetRSE(self.msUnmergedColl, keepTimestamps=True, retryCount=self.msConfig['mongoDBRetryCount']) + except NotMasterError: + msg = "Could not reset RSE to MongoDB for the maximum of %s mongoDBRetryCounts configured." % self.msConfig['mongoDBRetryCount'] + msg += "Giving up now. The whole cleanup process will be retried for this RSE on the next run." + msg += "Duplicate deletion retries may cause error messages from false positives and wrong counters during next polling cycle." + raise MSUnmergedPlineExit(msg) return rse # @profile @@ -554,6 +582,7 @@ def genFunc(pattern, iterable): rse['counters']['dirsToDeleteAll'] = len(rse['files']['toDelete']) return rse + self.logger.info("rse: %s", twFormat(rse, maxLength=8)) # If we are here, then there are service filters... for dirName in rse['dirs']['toDelete']: # apply exclusion filter @@ -617,7 +646,7 @@ def purgeRseObj(self, rse, dumpRSEtoLog=False): if dumpRSEtoLog: self.logger.debug(msg, pformat(rse)) else: - self.logger.debug(msg, twFormat(rse, maxLength=6)) + self.logger.debug(msg, twFormat(rse, maxLength=8)) rse.clear() return rse @@ -639,13 +668,16 @@ def updateRSETimestamps(self, rse, start=True, end=True): # that the RSE object has been updated from the database. self.rseTimestamps[rseName] = rse['timestamps'] + # # Read last RucioConMon stat time for this RSE: + # self.rseTimestamps[rseName]['rseConsStatsTime'] = self.rseConsStats[rseName]['startTime'] + # Update the timestamps: if start: self.rseTimestamps[rseName]['prevStartTime'] = self.rseTimestamps[rseName]['startTime'] self.rseTimestamps[rseName]['startTime'] = currTime if end: - self.rseTimestamps[rseName]['prevEndtime'] = self.rseTimestamps[rseName]['endTime'] - self.rseTimestamps[rseName]['endtime'] = currTime + self.rseTimestamps[rseName]['prevEndTime'] = self.rseTimestamps[rseName]['endTime'] + self.rseTimestamps[rseName]['endTime'] = currTime rse['timestamps'] = self.rseTimestamps[rseName] return rse @@ -730,7 +762,13 @@ def uploadRSEToMongoDB(self, rse, fullRSEToDB=False, overwrite=True): :param rse: The RSE object to work on :return: rse """ - rse.writeRSEToMongoDB(self.msUnmergedColl, fullRSEToDB=fullRSEToDB, overwrite=overwrite) + try: + rse.writeRSEToMongoDB(self.msUnmergedColl, fullRSEToDB=fullRSEToDB, overwrite=overwrite, retryCount=self.msConfig['mongoDBRetryCount']) + except NotMasterError: + msg = "Could not write RSE to MongoDB for the maximum of %s mongoDBRetryCounts configured." % self.msConfig['mongoDBRetryCount'] + msg += "Giving up now. The whole cleanup process will be retried for this RSE on the next run." + msg += "Duplicate deletion retries may cause error messages from false positives and wrong counters during next polling cycle." + raise MSUnmergedPlineExit(msg) return rse # @profile diff --git a/src/python/WMCore/MicroService/MSUnmerged/MSUnmergedRSE.py b/src/python/WMCore/MicroService/MSUnmerged/MSUnmergedRSE.py index 8f06a55586..4dfe742dc1 100644 --- a/src/python/WMCore/MicroService/MSUnmerged/MSUnmergedRSE.py +++ b/src/python/WMCore/MicroService/MSUnmerged/MSUnmergedRSE.py @@ -3,7 +3,7 @@ Description: Provides a document Template for the MSUnmerged MicroServices """ -from pymongo import ReturnDocument +from pymongo.errors import NotMasterError # from pymongo.results import results as MongoResults @@ -15,6 +15,15 @@ class MSUnmergedRSE(dict): def __init__(self, rseName, **kwargs): super(MSUnmergedRSE, self).__init__(**kwargs) + self.rseName = rseName + self.update(self.defaultDoc()) + self.mongoFilter = {'name': self['name']} + + def defaultDoc(self): + """ + Returns the data schema for a record in MongoDB. + :return: A simple dictionary populated with default values + """ # NOTE: totalNumFiles reflects the total number of files at the RSE as # fetched from the Rucio Consistency Monitor. Once the relevant # protected paths have been filtered out and the path been cut to the @@ -28,13 +37,14 @@ def __init__(self, rseName, **kwargs): # '/store/unmerged/Run2018B/TOTEM42/MINIAOD/22Feb2019-v1': , # '/store/unmerged/Run2018B/TOTEM21/AOD/22Feb2019-v1': , # '/store/unmerged/Run2018D/MuonEG/RAW-RECO/TopMuEG-12Nov2019_UL2018-v1': } - myDoc = { - "name": rseName, + defaultDoc = { + "name": self.rseName, "pfnPrefix": None, "isClean": False, - "timestamps": {'prevStartTime': 0.0, + "timestamps": {'rseConsStatTime': 0.0, + 'prevStartTime': 0.0, 'startTime': 0.0, - 'prevEndtime': 0.0, + 'prevEndTime': 0.0, 'endTime': 0.0}, "counters": {"totalNumFiles": 0, "dirsToDeleteAll": 0, @@ -51,8 +61,7 @@ def __init__(self, rseName, **kwargs): "toDelete": set(), "protected": set()} } - self.update(myDoc) - self.mongoFilter = {'name': self['name']} + return defaultDoc def buildMongoProjection(self, fullRSEToDB=False): """ @@ -74,13 +83,19 @@ def buildMongoProjection(self, fullRSEToDB=False): mongoProjection.update({"files": True}) return mongoProjection - def readRSEFromMongoDB(self, collection): + def readRSEFromMongoDB(self, collection, useProjection=False): """ A method to read the RSE object from Database and update it's fields. :param collection: The MongoDB collection to read from + :param useProjection: Use the projection returned by self.buildProjection() + while reading from MongoDB :return: True if read and update were both successful, False otherwise. """ - mongoRecord = collection.find_one(self.mongoFilter) + if useProjection: + mongoProjection = self.buildMongoProjection() + mongoRecord = collection.find_one(self.mongoFilter, projection=mongoProjection) + else: + mongoRecord = collection.find_one(self.mongoFilter) # update the list fields read from MongoDB back to strictly pythonic `set`s if mongoRecord: @@ -91,21 +106,26 @@ def readRSEFromMongoDB(self, collection): else: return False - def writeRSEToMongoDB(self, collection, fullRSEToDB=False, overwrite=False): + def writeRSEToMongoDB(self, collection, fullRSEToDB=False, overwrite=False, retryCount=0): """ A method to write/update the RSE at the Database from the current object. - :param collection: The MonogoDB collection to write on + :param collection: The MonogoDB collection to write on :param fullRSEToDB: Bool flag, used to trigger dump of the whole RSE object to - the database with the `files` section (excluding the generator objects) - NOTE: if fullRSEToDB=False and a previous record for the RSE already exists - the fields missing from the projection won't be updated - during this write operation but will preserver their values. - To completely refresh and RSE record in the database use - self.purgeRSEAtMongoDB first. - :param overwrite: A flag to note if the currently existing document into - the database is about to be completely replaced or just - fields update is to happen. - :return: True if update was successful, False otherwise. + the database with the `files` section (excluding the generator objects) + NOTE: if fullRSEToDB=False and a previous record for the RSE already exists + the fields missing from the projection won't be updated + during this write operation but will preserver their values. + To completely refresh and RSE record in the database use + self.purgeRSEAtMongoDB first. + :param overwrite: A flag to note if the currently existing document into + the database is about to be completely replaced or just + fields update is to happen. + :param retryCount: The number of retries for the write operation if failed due to + `NotMasterError. Possible values: + 0 - the write operation will be tried exactly once and no retries will happen + > 0 - the write operation will be retried this number of times + < 0 - the write operation will never be tried + :return: True if update was successful, False otherwise. """ # NOTE: The fields to be manipulated are only those which are compatible # with MongoDB (i.e. here we avoid any field holding a strictly @@ -124,24 +144,58 @@ def writeRSEToMongoDB(self, collection, fullRSEToDB=False, overwrite=False): updateFields[field] = {} for fileKey, fileSet in self[field].items(): if isinstance(self[field][fileKey], dict): - updateFields[field][fileKey] = list(self[field][fileKey]) + # Iterating through the filterNames here, and recording only empty lists for filter values + # NOTE: We can either execute the filter and write every single file in the database + # or if we need it we may simply use the filterName to recreate it. + updateFields[field][fileKey] = dict([(filterName, []) for filterName in list(self[field][fileKey])]) else: updateFields[field][fileKey] = self[field][fileKey] else: updateFields[field] = self[field] updateOps = {'$set': updateFields} - if overwrite: - result = collection.replace_one(self.mongoFilter, - updateFields, - upsert=True) - else: - result = collection.update_one(self.mongoFilter, - updateOps, - upsert=True) + # NOTE: NotMasterError is a recoverable error, caused by a session to a + # non primary backend part of a replicaset. + while retryCount >= 0: + try: + if overwrite: + result = collection.replace_one(self.mongoFilter, updateFields, upsert=True) + else: + result = collection.update_one(self.mongoFilter, updateOps, upsert=True) + break + except NotMasterError: + if retryCount: + msg = "Failed write operation to MongoDB. %s retries left." + self.logger.warning(msg, retryCount) + retryCount -= 1 + else: + msg = "Failed write operation to MongoDB. All retries exhausted." + self.logger.warning(msg, retryCount) + raise + # NOTE: If and `upsert` took place the modified_count for both operations is 0 # because no modification took place but rather an insertion. if result.modified_count or result.upserted_id: return True else: return False + + def resetRSE(self, collection, keepTimestamps=False, keepCounters=False, retryCount=0): + """ + Resets all records of the RSE object to default values and write the + document to MongoDB + :param keepTimestamps: Bool flag to keep the timestamps + :param keepCounters: Bool flag to keep the counters + :param retryCount: The number of retries for the write operation if failed due to `NotMasterError. + :return: True if operation succeeded. + """ + resetDoc = self.defaultDoc() + + if keepTimestamps: + resetDoc['timestamps'] = self['timestamps'] + if keepCounters: + resetDoc['counters'] = self['counters'] + + self.update(resetDoc) + writeResult = self.writeRSEToMongoDB(collection, fullRSEToDB=True, overwrite=True, retryCount=retryCount) + return writeResult