Skip to content

Commit

Permalink
Merge pull request #10906 from todor-ivanov/feature_MSUnmerged_MoveSt…
Browse files Browse the repository at this point in the history
…ateful_fix-10797

Adapt MSUnmerged to sync RSE data with MongoDB properly
  • Loading branch information
todor-ivanov authored Dec 2, 2021
2 parents d2b9dd7 + fa051d0 commit e0a8394
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 38 deletions.
54 changes: 46 additions & 8 deletions src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
gfal2 = None

from pymongo import IndexModel
from pymongo.errors import NotMasterError

# WMCore modules
from WMCore.MicroService.DataStructs.DefaultStructs import UNMERGED_REPORT
Expand Down Expand Up @@ -109,6 +110,7 @@ def __init__(self, msConfig, logger=None):
self.msConfig.setdefault("mongoDBUrl", 'mongodb://localhost')
self.msConfig.setdefault("mongoDBPort", 27017)
self.msConfig.setdefault("mongoDB", 'msUnmergedDB')
self.msConfig.setdefault("mongoDBRetryCount", 3)
self.msConfig.setdefault("mongoDBReplicaset", None)

msUnmergedIndex = IndexModel('name', unique=True)
Expand Down Expand Up @@ -164,6 +166,9 @@ def __init__(self, msConfig, logger=None):
self.regStoreUnmergedLfn = re.compile("^/store/unmerged/.*$")
self.regStoreUnmergedPfn = re.compile("^.+/store/unmerged/.*$")

# log msConfig
self.logger.info("msConfig: %s", pformat(self.msConfig))

# @profile
def execute(self):
"""
Expand Down Expand Up @@ -415,7 +420,7 @@ def _checkClean(self, rse):

def consRecordAge(self, rse):
"""
A method to heck the duration of the consistency record for the RSE
A method to check the duration of the consistency record for the RSE
:param rse: The RSE to be checked
:return: rse or raises MSUnmergedPlineExit
"""
Expand All @@ -431,15 +436,38 @@ def consRecordAge(self, rse):
isConsNewer = self.rseConsStats[rseName]['end_time'] > self.rseTimestamps[rseName]['prevStartTime']
if not isConsNewer:
msg = "RSE: %s With old consistency record in Rucio Consistency Monitor. " % rseName
msg += "Skipping it in the current run."
self.logger.info(msg)
raise MSUnmergedPlineExit(msg)
if 'isClean' in rse and rse['isClean']:
msg += "And the RSE has been cleaned during the last Rucio Consistency Monitor polling cycle."
msg += "Skipping it in the current run."
self.logger.info(msg)
self.updateRSETimestamps(rse, start=False, end=True)
raise MSUnmergedPlineExit(msg)
else:
msg += "But the RSE has NOT been cleaned during the last Rucio Consistency Monitor polling cycle."
msg += "Retrying cleanup in the current run."
self.logger.info(msg)
if not isConsDone:
msg = "RSE: %s In non-final state in Rucio Consistency Monitor. " % rseName
msg += "Skipping it in the current run."
self.logger.warning(msg)
self.updateRSETimestamps(rse, start=False, end=True)
raise MSUnmergedPlineExit(msg)

if isConsNewer and isConsDone:
# NOTE: If we've got to this point then we have a brand new record for
# the RSE in RucioConMOn and we are then about to start a new RSE cleanup
# so we will need to wipe out all but the timestamps from both
# the current object and the MongoDB record for the object.
msg = "RSE: %s With new consistency record in Rucio Consistency Monitor. " % rseName
msg += "Resetting RSE and starting a fresh cleanup process in the current run."
self.logger.info(msg)
try:
rse.resetRSE(self.msUnmergedColl, keepTimestamps=True, retryCount=self.msConfig['mongoDBRetryCount'])
except NotMasterError:
msg = "Could not reset RSE to MongoDB for the maximum of %s mongoDBRetryCounts configured." % self.msConfig['mongoDBRetryCount']
msg += "Giving up now. The whole cleanup process will be retried for this RSE on the next run."
msg += "Duplicate deletion retries may cause error messages from false positives and wrong counters during next polling cycle."
raise MSUnmergedPlineExit(msg)
return rse

# @profile
Expand Down Expand Up @@ -554,6 +582,7 @@ def genFunc(pattern, iterable):
rse['counters']['dirsToDeleteAll'] = len(rse['files']['toDelete'])
return rse

self.logger.info("rse: %s", twFormat(rse, maxLength=8))
# If we are here, then there are service filters...
for dirName in rse['dirs']['toDelete']:
# apply exclusion filter
Expand Down Expand Up @@ -617,7 +646,7 @@ def purgeRseObj(self, rse, dumpRSEtoLog=False):
if dumpRSEtoLog:
self.logger.debug(msg, pformat(rse))
else:
self.logger.debug(msg, twFormat(rse, maxLength=6))
self.logger.debug(msg, twFormat(rse, maxLength=8))
rse.clear()
return rse

Expand All @@ -639,13 +668,16 @@ def updateRSETimestamps(self, rse, start=True, end=True):
# that the RSE object has been updated from the database.
self.rseTimestamps[rseName] = rse['timestamps']

# # Read last RucioConMon stat time for this RSE:
# self.rseTimestamps[rseName]['rseConsStatsTime'] = self.rseConsStats[rseName]['startTime']

# Update the timestamps:
if start:
self.rseTimestamps[rseName]['prevStartTime'] = self.rseTimestamps[rseName]['startTime']
self.rseTimestamps[rseName]['startTime'] = currTime
if end:
self.rseTimestamps[rseName]['prevEndtime'] = self.rseTimestamps[rseName]['endTime']
self.rseTimestamps[rseName]['endtime'] = currTime
self.rseTimestamps[rseName]['prevEndTime'] = self.rseTimestamps[rseName]['endTime']
self.rseTimestamps[rseName]['endTime'] = currTime
rse['timestamps'] = self.rseTimestamps[rseName]
return rse

Expand Down Expand Up @@ -730,7 +762,13 @@ def uploadRSEToMongoDB(self, rse, fullRSEToDB=False, overwrite=True):
:param rse: The RSE object to work on
:return: rse
"""
rse.writeRSEToMongoDB(self.msUnmergedColl, fullRSEToDB=fullRSEToDB, overwrite=overwrite)
try:
rse.writeRSEToMongoDB(self.msUnmergedColl, fullRSEToDB=fullRSEToDB, overwrite=overwrite, retryCount=self.msConfig['mongoDBRetryCount'])
except NotMasterError:
msg = "Could not write RSE to MongoDB for the maximum of %s mongoDBRetryCounts configured." % self.msConfig['mongoDBRetryCount']
msg += "Giving up now. The whole cleanup process will be retried for this RSE on the next run."
msg += "Duplicate deletion retries may cause error messages from false positives and wrong counters during next polling cycle."
raise MSUnmergedPlineExit(msg)
return rse

# @profile
Expand Down
114 changes: 84 additions & 30 deletions src/python/WMCore/MicroService/MSUnmerged/MSUnmergedRSE.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Description: Provides a document Template for the MSUnmerged MicroServices
"""

from pymongo import ReturnDocument
from pymongo.errors import NotMasterError
# from pymongo.results import results as MongoResults


Expand All @@ -15,6 +15,15 @@ class MSUnmergedRSE(dict):
def __init__(self, rseName, **kwargs):
super(MSUnmergedRSE, self).__init__(**kwargs)

self.rseName = rseName
self.update(self.defaultDoc())
self.mongoFilter = {'name': self['name']}

def defaultDoc(self):
"""
Returns the data schema for a record in MongoDB.
:return: A simple dictionary populated with default values
"""
# NOTE: totalNumFiles reflects the total number of files at the RSE as
# fetched from the Rucio Consistency Monitor. Once the relevant
# protected paths have been filtered out and the path been cut to the
Expand All @@ -28,13 +37,14 @@ def __init__(self, rseName, **kwargs):
# '/store/unmerged/Run2018B/TOTEM42/MINIAOD/22Feb2019-v1': <filter at 0x7f3699d93208>,
# '/store/unmerged/Run2018B/TOTEM21/AOD/22Feb2019-v1': <filter at 0x7f3699d93128>,
# '/store/unmerged/Run2018D/MuonEG/RAW-RECO/TopMuEG-12Nov2019_UL2018-v1': <filter at 0x7f3699d93668>}
myDoc = {
"name": rseName,
defaultDoc = {
"name": self.rseName,
"pfnPrefix": None,
"isClean": False,
"timestamps": {'prevStartTime': 0.0,
"timestamps": {'rseConsStatTime': 0.0,
'prevStartTime': 0.0,
'startTime': 0.0,
'prevEndtime': 0.0,
'prevEndTime': 0.0,
'endTime': 0.0},
"counters": {"totalNumFiles": 0,
"dirsToDeleteAll": 0,
Expand All @@ -51,8 +61,7 @@ def __init__(self, rseName, **kwargs):
"toDelete": set(),
"protected": set()}
}
self.update(myDoc)
self.mongoFilter = {'name': self['name']}
return defaultDoc

def buildMongoProjection(self, fullRSEToDB=False):
"""
Expand All @@ -74,13 +83,19 @@ def buildMongoProjection(self, fullRSEToDB=False):
mongoProjection.update({"files": True})
return mongoProjection

def readRSEFromMongoDB(self, collection):
def readRSEFromMongoDB(self, collection, useProjection=False):
"""
A method to read the RSE object from Database and update it's fields.
:param collection: The MongoDB collection to read from
:param useProjection: Use the projection returned by self.buildProjection()
while reading from MongoDB
:return: True if read and update were both successful, False otherwise.
"""
mongoRecord = collection.find_one(self.mongoFilter)
if useProjection:
mongoProjection = self.buildMongoProjection()
mongoRecord = collection.find_one(self.mongoFilter, projection=mongoProjection)
else:
mongoRecord = collection.find_one(self.mongoFilter)

# update the list fields read from MongoDB back to strictly pythonic `set`s
if mongoRecord:
Expand All @@ -91,21 +106,26 @@ def readRSEFromMongoDB(self, collection):
else:
return False

def writeRSEToMongoDB(self, collection, fullRSEToDB=False, overwrite=False):
def writeRSEToMongoDB(self, collection, fullRSEToDB=False, overwrite=False, retryCount=0):
"""
A method to write/update the RSE at the Database from the current object.
:param collection: The MonogoDB collection to write on
:param collection: The MonogoDB collection to write on
:param fullRSEToDB: Bool flag, used to trigger dump of the whole RSE object to
the database with the `files` section (excluding the generator objects)
NOTE: if fullRSEToDB=False and a previous record for the RSE already exists
the fields missing from the projection won't be updated
during this write operation but will preserver their values.
To completely refresh and RSE record in the database use
self.purgeRSEAtMongoDB first.
:param overwrite: A flag to note if the currently existing document into
the database is about to be completely replaced or just
fields update is to happen.
:return: True if update was successful, False otherwise.
the database with the `files` section (excluding the generator objects)
NOTE: if fullRSEToDB=False and a previous record for the RSE already exists
the fields missing from the projection won't be updated
during this write operation but will preserver their values.
To completely refresh and RSE record in the database use
self.purgeRSEAtMongoDB first.
:param overwrite: A flag to note if the currently existing document into
the database is about to be completely replaced or just
fields update is to happen.
:param retryCount: The number of retries for the write operation if failed due to
`NotMasterError. Possible values:
0 - the write operation will be tried exactly once and no retries will happen
> 0 - the write operation will be retried this number of times
< 0 - the write operation will never be tried
:return: True if update was successful, False otherwise.
"""
# NOTE: The fields to be manipulated are only those which are compatible
# with MongoDB (i.e. here we avoid any field holding a strictly
Expand All @@ -124,24 +144,58 @@ def writeRSEToMongoDB(self, collection, fullRSEToDB=False, overwrite=False):
updateFields[field] = {}
for fileKey, fileSet in self[field].items():
if isinstance(self[field][fileKey], dict):
updateFields[field][fileKey] = list(self[field][fileKey])
# Iterating through the filterNames here, and recording only empty lists for filter values
# NOTE: We can either execute the filter and write every single file in the database
# or if we need it we may simply use the filterName to recreate it.
updateFields[field][fileKey] = dict([(filterName, []) for filterName in list(self[field][fileKey])])
else:
updateFields[field][fileKey] = self[field][fileKey]
else:
updateFields[field] = self[field]

updateOps = {'$set': updateFields}
if overwrite:
result = collection.replace_one(self.mongoFilter,
updateFields,
upsert=True)
else:
result = collection.update_one(self.mongoFilter,
updateOps,
upsert=True)
# NOTE: NotMasterError is a recoverable error, caused by a session to a
# non primary backend part of a replicaset.
while retryCount >= 0:
try:
if overwrite:
result = collection.replace_one(self.mongoFilter, updateFields, upsert=True)
else:
result = collection.update_one(self.mongoFilter, updateOps, upsert=True)
break
except NotMasterError:
if retryCount:
msg = "Failed write operation to MongoDB. %s retries left."
self.logger.warning(msg, retryCount)
retryCount -= 1
else:
msg = "Failed write operation to MongoDB. All retries exhausted."
self.logger.warning(msg, retryCount)
raise

# NOTE: If and `upsert` took place the modified_count for both operations is 0
# because no modification took place but rather an insertion.
if result.modified_count or result.upserted_id:
return True
else:
return False

def resetRSE(self, collection, keepTimestamps=False, keepCounters=False, retryCount=0):
"""
Resets all records of the RSE object to default values and write the
document to MongoDB
:param keepTimestamps: Bool flag to keep the timestamps
:param keepCounters: Bool flag to keep the counters
:param retryCount: The number of retries for the write operation if failed due to `NotMasterError.
:return: True if operation succeeded.
"""
resetDoc = self.defaultDoc()

if keepTimestamps:
resetDoc['timestamps'] = self['timestamps']
if keepCounters:
resetDoc['counters'] = self['counters']

self.update(resetDoc)
writeResult = self.writeRSEToMongoDB(collection, fullRSEToDB=True, overwrite=True, retryCount=retryCount)
return writeResult

0 comments on commit e0a8394

Please sign in to comment.