From 494fcd82c243604d7da110f3859c21e4bc9536d1 Mon Sep 17 00:00:00 2001 From: Todor Ivanov Date: Wed, 24 Nov 2021 13:51:30 +0100 Subject: [PATCH] Adapt MSUnmerged to sync RSE data with MongoDB properly Add resetRSE method && resetRSE before starting a fresh cleanup && Implement readRSEFromMongoDB with projections. ResetRSE only if a new record in RucioConMon is present. Fix some typos and missing logs fixes from reviews Fix bug with field type transformation during write to DB for fields of gen/filter type. Try to recover from NonMasterError during write operation to MongoDB. Move retries of MongoDB write operation in the RSE writeRSEToMongoDB method. Fix retry logic condition and log messages. Fix missing argument for updateRSETimestamps. --- .../MicroService/MSUnmerged/MSUnmerged.py | 54 +++++++-- .../MicroService/MSUnmerged/MSUnmergedRSE.py | 113 +++++++++++++----- 2 files changed, 130 insertions(+), 37 deletions(-) diff --git a/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py b/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py index 6ff8c9bc6f..4a3607d7e5 100644 --- a/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py +++ b/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py @@ -29,6 +29,7 @@ gfal2 = None from pymongo import IndexModel +from pymongo.errors import NotMasterError # WMCore modules from WMCore.MicroService.DataStructs.DefaultStructs import UNMERGED_REPORT @@ -109,6 +110,7 @@ def __init__(self, msConfig, logger=None): self.msConfig.setdefault("mongoDBUrl", 'mongodb://localhost') self.msConfig.setdefault("mongoDBPort", 27017) self.msConfig.setdefault("mongoDB", 'msUnmergedDB') + self.msConfig.setdefault("mongoDBRetryCount", 3) self.msConfig.setdefault("mongoDBReplicaset", None) msUnmergedIndex = IndexModel('name', unique=True) @@ -164,6 +166,9 @@ def __init__(self, msConfig, logger=None): self.regStoreUnmergedLfn = re.compile("^/store/unmerged/.*$") self.regStoreUnmergedPfn = re.compile("^.+/store/unmerged/.*$") + # log msConfig + self.logger.info("msConfig: %s", pformat(self.msConfig)) + # @profile def execute(self): """ @@ -415,7 +420,7 @@ def _checkClean(self, rse): def consRecordAge(self, rse): """ - A method to heck the duration of the consistency record for the RSE + A method to check the duration of the consistency record for the RSE :param rse: The RSE to be checked :return: rse or raises MSUnmergedPlineExit """ @@ -431,15 +436,38 @@ def consRecordAge(self, rse): isConsNewer = self.rseConsStats[rseName]['end_time'] > self.rseTimestamps[rseName]['prevStartTime'] if not isConsNewer: msg = "RSE: %s With old consistency record in Rucio Consistency Monitor. " % rseName - msg += "Skipping it in the current run." - self.logger.info(msg) - raise MSUnmergedPlineExit(msg) + if 'isClean' in rse and rse['isClean']: + msg += "And the RSE has been cleaned during the last Rucio Consistency Monitor polling cycle." + msg += "Skipping it in the current run." + self.logger.info(msg) + self.updateRSETimestamps(rse, start=False, end=True) + raise MSUnmergedPlineExit(msg) + else: + msg += "But the RSE has NOT been cleaned during the last Rucio Consistency Monitor polling cycle." + msg += "Retrying cleanup in the current run." + self.logger.info(msg) if not isConsDone: msg = "RSE: %s In non-final state in Rucio Consistency Monitor. " % rseName msg += "Skipping it in the current run." self.logger.warning(msg) + self.updateRSETimestamps(rse, start=False, end=True) raise MSUnmergedPlineExit(msg) + if isConsNewer and isConsDone: + # NOTE: If we've got to this point then we have a brand new record for + # the RSE in RucioConMOn and we are then about to start a new RSE cleanup + # so we will need to wipe out all but the timestamps from both + # the current object and the MongoDB record for the object. + msg = "RSE: %s With new consistency record in Rucio Consistency Monitor. " % rseName + msg += "Resetting RSE and starting a fresh cleanup process in the current run." + self.logger.info(msg) + try: + rse.resetRSE(self.msUnmergedColl, keepTimestamps=True, retryCount=self.msConfig['mongoDBRetryCount']) + except NotMasterError: + msg = "Could not reset RSE to MongoDB for the maximum of %s mongoDBRetryCounts configured." % self.msConfig['mongoDBRetryCount'] + msg += "Giving up now. The whole cleanup process will be retried for this RSE on the next run." + msg += "Duplicate deletion retries may cause error messages from false positives and wrong counters during next polling cycle." + raise MSUnmergedPlineExit(msg) return rse # @profile @@ -554,6 +582,7 @@ def genFunc(pattern, iterable): rse['counters']['dirsToDeleteAll'] = len(rse['files']['toDelete']) return rse + self.logger.info("rse: %s", twFormat(rse, maxLength=8)) # If we are here, then there are service filters... for dirName in rse['dirs']['toDelete']: # apply exclusion filter @@ -617,7 +646,7 @@ def purgeRseObj(self, rse, dumpRSEtoLog=False): if dumpRSEtoLog: self.logger.debug(msg, pformat(rse)) else: - self.logger.debug(msg, twFormat(rse, maxLength=6)) + self.logger.debug(msg, twFormat(rse, maxLength=8)) rse.clear() return rse @@ -639,13 +668,16 @@ def updateRSETimestamps(self, rse, start=True, end=True): # that the RSE object has been updated from the database. self.rseTimestamps[rseName] = rse['timestamps'] + # # Read last RucioConMon stat time for this RSE: + # self.rseTimestamps[rseName]['rseConsStatsTime'] = self.rseConsStats[rseName]['startTime'] + # Update the timestamps: if start: self.rseTimestamps[rseName]['prevStartTime'] = self.rseTimestamps[rseName]['startTime'] self.rseTimestamps[rseName]['startTime'] = currTime if end: - self.rseTimestamps[rseName]['prevEndtime'] = self.rseTimestamps[rseName]['endTime'] - self.rseTimestamps[rseName]['endtime'] = currTime + self.rseTimestamps[rseName]['prevEndTime'] = self.rseTimestamps[rseName]['endTime'] + self.rseTimestamps[rseName]['endTime'] = currTime rse['timestamps'] = self.rseTimestamps[rseName] return rse @@ -730,7 +762,13 @@ def uploadRSEToMongoDB(self, rse, fullRSEToDB=False, overwrite=True): :param rse: The RSE object to work on :return: rse """ - rse.writeRSEToMongoDB(self.msUnmergedColl, fullRSEToDB=fullRSEToDB, overwrite=overwrite) + try: + rse.writeRSEToMongoDB(self.msUnmergedColl, fullRSEToDB=fullRSEToDB, overwrite=overwrite, retryCount=self.msConfig['mongoDBRetryCount']) + except NotMasterError: + msg = "Could not write RSE to MongoDB for the maximum of %s mongoDBRetryCounts configured." % self.msConfig['mongoDBRetryCount'] + msg += "Giving up now. The whole cleanup process will be retried for this RSE on the next run." + msg += "Duplicate deletion retries may cause error messages from false positives and wrong counters during next polling cycle." + raise MSUnmergedPlineExit(msg) return rse # @profile diff --git a/src/python/WMCore/MicroService/MSUnmerged/MSUnmergedRSE.py b/src/python/WMCore/MicroService/MSUnmerged/MSUnmergedRSE.py index 8f06a55586..7ef3f6189e 100644 --- a/src/python/WMCore/MicroService/MSUnmerged/MSUnmergedRSE.py +++ b/src/python/WMCore/MicroService/MSUnmerged/MSUnmergedRSE.py @@ -4,6 +4,7 @@ """ from pymongo import ReturnDocument +from pymongo.errors import NotMasterError # from pymongo.results import results as MongoResults @@ -15,6 +16,15 @@ class MSUnmergedRSE(dict): def __init__(self, rseName, **kwargs): super(MSUnmergedRSE, self).__init__(**kwargs) + self.rseName = rseName + self.update(self.defaultDoc()) + self.mongoFilter = {'name': self['name']} + + def defaultDoc(self): + """ + Returns the data schema for a record in MongoDB. + :return: A simple dictionary populated with default values + """ # NOTE: totalNumFiles reflects the total number of files at the RSE as # fetched from the Rucio Consistency Monitor. Once the relevant # protected paths have been filtered out and the path been cut to the @@ -28,13 +38,14 @@ def __init__(self, rseName, **kwargs): # '/store/unmerged/Run2018B/TOTEM42/MINIAOD/22Feb2019-v1': , # '/store/unmerged/Run2018B/TOTEM21/AOD/22Feb2019-v1': , # '/store/unmerged/Run2018D/MuonEG/RAW-RECO/TopMuEG-12Nov2019_UL2018-v1': } - myDoc = { - "name": rseName, + defaultDoc = { + "name": self.rseName, "pfnPrefix": None, "isClean": False, - "timestamps": {'prevStartTime': 0.0, + "timestamps": {'rseConsStatTime': 0.0, + 'prevStartTime': 0.0, 'startTime': 0.0, - 'prevEndtime': 0.0, + 'prevEndTime': 0.0, 'endTime': 0.0}, "counters": {"totalNumFiles": 0, "dirsToDeleteAll": 0, @@ -51,8 +62,7 @@ def __init__(self, rseName, **kwargs): "toDelete": set(), "protected": set()} } - self.update(myDoc) - self.mongoFilter = {'name': self['name']} + return defaultDoc def buildMongoProjection(self, fullRSEToDB=False): """ @@ -74,13 +84,19 @@ def buildMongoProjection(self, fullRSEToDB=False): mongoProjection.update({"files": True}) return mongoProjection - def readRSEFromMongoDB(self, collection): + def readRSEFromMongoDB(self, collection, useProjection=False): """ A method to read the RSE object from Database and update it's fields. :param collection: The MongoDB collection to read from + :param useProjection: Use the projection returned by self.buildProjection() + while reading from MongoDB :return: True if read and update were both successful, False otherwise. """ - mongoRecord = collection.find_one(self.mongoFilter) + if useProjection: + mongoProjection = self.buildMongoProjection() + mongoRecord = collection.find_one(self.mongoFilter, projection=mongoProjection) + else: + mongoRecord = collection.find_one(self.mongoFilter) # update the list fields read from MongoDB back to strictly pythonic `set`s if mongoRecord: @@ -91,21 +107,26 @@ def readRSEFromMongoDB(self, collection): else: return False - def writeRSEToMongoDB(self, collection, fullRSEToDB=False, overwrite=False): + def writeRSEToMongoDB(self, collection, fullRSEToDB=False, overwrite=False, retryCount=0): """ A method to write/update the RSE at the Database from the current object. - :param collection: The MonogoDB collection to write on + :param collection: The MonogoDB collection to write on :param fullRSEToDB: Bool flag, used to trigger dump of the whole RSE object to - the database with the `files` section (excluding the generator objects) - NOTE: if fullRSEToDB=False and a previous record for the RSE already exists - the fields missing from the projection won't be updated - during this write operation but will preserver their values. - To completely refresh and RSE record in the database use - self.purgeRSEAtMongoDB first. - :param overwrite: A flag to note if the currently existing document into - the database is about to be completely replaced or just - fields update is to happen. - :return: True if update was successful, False otherwise. + the database with the `files` section (excluding the generator objects) + NOTE: if fullRSEToDB=False and a previous record for the RSE already exists + the fields missing from the projection won't be updated + during this write operation but will preserver their values. + To completely refresh and RSE record in the database use + self.purgeRSEAtMongoDB first. + :param overwrite: A flag to note if the currently existing document into + the database is about to be completely replaced or just + fields update is to happen. + :param retryCount: The number of retries for the write operation if failed due to + `NotMasterError. Possible values: + 0 - the write operation will be tried exactly once and no retries will happen + > 0 - the write operation will be retried this number of times + < 0 - the write operation will never be tried + :return: True if update was successful, False otherwise. """ # NOTE: The fields to be manipulated are only those which are compatible # with MongoDB (i.e. here we avoid any field holding a strictly @@ -124,24 +145,58 @@ def writeRSEToMongoDB(self, collection, fullRSEToDB=False, overwrite=False): updateFields[field] = {} for fileKey, fileSet in self[field].items(): if isinstance(self[field][fileKey], dict): - updateFields[field][fileKey] = list(self[field][fileKey]) + # Iterating through the filterNames here, and recording only empty lists for filter values + # NOTE: We can either execute the filter and write every single file in the database + # or if we need it we may simply use the filterName to recreate it. + updateFields[field][fileKey] = dict([(filterName, []) for filterName in list(self[field][fileKey])]) else: updateFields[field][fileKey] = self[field][fileKey] else: updateFields[field] = self[field] updateOps = {'$set': updateFields} - if overwrite: - result = collection.replace_one(self.mongoFilter, - updateFields, - upsert=True) - else: - result = collection.update_one(self.mongoFilter, - updateOps, - upsert=True) + # NOTE: NotMasterError is a recoverable error, caused by a session to a + # non primary backend part of a replicaset. + while retryCount >= 0: + try: + if overwrite: + result = collection.replace_one(self.mongoFilter, updateFields, upsert=True) + else: + result = collection.update_one(self.mongoFilter, updateOps, upsert=True) + break + except NotMasterError: + if retryCount: + msg = "Failed write operation to MongoDB. %s retries left." + self.logger.warning(msg, retryCount) + retryCount -= 1 + else: + msg = "Failed write operation to MongoDB. All retries exhausted." + self.logger.warning(msg, retryCount) + raise + # NOTE: If and `upsert` took place the modified_count for both operations is 0 # because no modification took place but rather an insertion. if result.modified_count or result.upserted_id: return True else: return False + + def resetRSE(self, collection, keepTimestamps=False, keepCounters=False, retryCount=0): + """ + Resets all records of the RSE object to default values and write the + document to MongoDB + :param keepTimestamps: Bool flag to keep the timestamps + :param keepCounters: Bool flag to keep the counters + :param retryCount: The number of retries for the write operation if failed due to `NotMasterError. + :return: True if operation succeeded. + """ + resetDoc = self.defaultDoc() + + if keepTimestamps: + resetDoc['timestamps'] = self['timestamps'] + if keepCounters: + resetDoc['counters'] = self['counters'] + + self.update(resetDoc) + writeResult = self.writeRSEToMongoDB(collection, fullRSEToDB=True, overwrite=True, retryCount=retryCount) + return writeResult