diff --git a/src/python/WMCore/MicroService/MSPileup/MSPileupMonitoring.py b/src/python/WMCore/MicroService/MSPileup/MSPileupMonitoring.py new file mode 100644 index 0000000000..369a9eb1db --- /dev/null +++ b/src/python/WMCore/MicroService/MSPileup/MSPileupMonitoring.py @@ -0,0 +1,109 @@ +""" +File : MSPileupMonitoring.py +Author : Valentin Kuznetsov +Description: MSPileupMonitoring provides bridge between MSPileup + service and CMS Monitoring infrastructure +""" + +# system modules +import time + +# WMCore modules +from WMCore.MicroService.Tools.Common import getMSLogger + +# CMSMonitoring modules +from CMSMonitoring.StompAMQ7 import StompAMQ7 as StompAMQ + + +def flatDocuments(doc): + """ + Helper function to flat out MSPileup document + + :param doc: input MSPileup document + :return: generator of MSPileup documents flatten from original one + """ + docs = flatKey(doc, 'campaigns') + docs = (f for d in docs for f in flatKey(d, 'currentRSEs')) + docs = (f for d in docs for f in flatKey(d, 'expectedRSEs')) + for doc in docs: + yield doc + + +def flatKey(doc, key): + """ + Helper function to flat out values of given key in a document + + :param doc: input MSPileup document + :param key: document key to use + :return: generator of MSPileup documents flatten from original one and given key + """ + for item in doc[key]: + ndoc = dict(doc) + # convert plural to singular key name, e.g. campaigns -> campaign + nkey = key[:-1] + ndoc[nkey] = item + del ndoc[key] + yield ndoc + + +class MSPileupMonitoring(): + """ + MSPileupMonitoring represents MSPileup monitoring class + """ + + def __init__(self, msConfig=None): + """ + Constructor for MSPileupMonitoring + """ + self.userAMQ = msConfig.get('user_amq', None) + self.passAMQ = msConfig.get('pass_amq', None) + self.postToAMQ = msConfig.get('post_to_amq', False) + self.topicAMQ = msConfig.get('topic_amq', None) + self.docTypeAMQ = msConfig.get('doc_type_amg', 'cms-ms-pileup') + self.hostPortAMQ = msConfig.get('host_port_amq', None) + self.producer = msConfig.get('producer', 'cms-ms-pileup') + self.logger = msConfig.get('logger', getMSLogger(False)) + + def uploadToAMQ(self, docs, producer=None): + """ + _uploadToAMQ_ + + Sends data to AMQ, which ends up in elastic search. + :param docs: list of documents/dicts to be posted + :param producer: service name that's providing this info + :return: {} or {"success": ndocs, "failures": nfailures} + """ + if not docs: + self.logger.info("There are no documents to send to AMQ") + return {} + if not self.userAMQ or not self.passAMQ: + self.logger.info("MSPileupMonitoring has no AMQ credentials, will skip the upload to MONIT") + return {} + + producer = producer or self.producer + ts = int(time.time()) + notifications = [] + + self.logger.debug("Sending %d to AMQ", len(docs)) + try: + stompSvc = StompAMQ(username=self.userAMQ, + password=self.passAMQ, + producer=producer, + topic=self.topicAMQ, + validation_schema=None, + host_and_ports=self.hostPortAMQ, + logger=self.logger) + + for doc in docs: + singleNotif, _, _ = stompSvc.make_notification(payload=doc, doc_type=self.docTypeAMQ, + ts=ts, data_subfield="payload") + notifications.append(singleNotif) + + failures = stompSvc.send(notifications) + msg = "%i out of %i documents successfully sent to AMQ" % (len(notifications) - len(failures), + len(notifications)) + self.logger.info(msg) + return {"success": len(notifications)-len(failures), "failures": len(failures)} + except Exception as ex: + self.logger.exception("Failed to send data to StompAMQ. Error %s", str(ex)) + return {} diff --git a/src/python/WMCore/MicroService/MSPileup/MSPileupTaskManager.py b/src/python/WMCore/MicroService/MSPileup/MSPileupTaskManager.py index 71b2a5abfe..bf2c97350b 100644 --- a/src/python/WMCore/MicroService/MSPileup/MSPileupTaskManager.py +++ b/src/python/WMCore/MicroService/MSPileup/MSPileupTaskManager.py @@ -20,6 +20,7 @@ from WMCore.MicroService.DataStructs.DefaultStructs import PILEUP_REPORT from WMCore.MicroService.MSPileup.MSPileupData import MSPileupData from WMCore.MicroService.MSPileup.MSPileupTasks import MSPileupTasks +from WMCore.MicroService.MSPileup.MSPileupMonitoring import MSPileupMonitoring class MSPileupTaskManager(MSCore): @@ -37,7 +38,8 @@ def __init__(self, msConfig, **kwargs): dryRun = msConfig.get('dryRun', False) self.rucioClient = self.rucio # set in MSCore init self.dataManager = MSPileupData(msConfig) - self.mgr = MSPileupTasks(self.dataManager, self.logger, + self.monitManager = MSPileupMonitoring(msConfig) + self.mgr = MSPileupTasks(self.dataManager, self.monitManager, self.logger, self.rucioAccount, self.rucioClient, dryRun) def status(self): @@ -61,3 +63,4 @@ def executeCycle(self): self.mgr.activeTask(marginSpace=self.marginSpace) self.mgr.inactiveTask() self.mgr.cleanupTask(self.cleanupDaysThreshold) + self.mgr.cmsMonitTask() diff --git a/src/python/WMCore/MicroService/MSPileup/MSPileupTasks.py b/src/python/WMCore/MicroService/MSPileup/MSPileupTasks.py index 54ff84b544..20bf59f960 100644 --- a/src/python/WMCore/MicroService/MSPileup/MSPileupTasks.py +++ b/src/python/WMCore/MicroService/MSPileup/MSPileupTasks.py @@ -14,6 +14,7 @@ from WMCore.MicroService.MSPileup.DataStructs.MSPileupReport import MSPileupReport from WMCore.Services.UUIDLib import makeUUID from WMCore.MicroService.Tools.PycurlRucio import getPileupContainerSizesRucio, getRucioToken +from WMCore.MicroService.MSPileup.MSPileupMonitoring import flatDocuments class MSPileupTasks(): @@ -25,7 +26,7 @@ class MSPileupTasks(): - active task to look-up pileup docs in active state """ - def __init__(self, dataManager, logger, rucioAccount, rucioClient, dryRun=False): + def __init__(self, dataManager, monitManager, logger, rucioAccount, rucioClient, dryRun=False): """ MSPileupTaskManager constructor :param dataManager: MSPileup Data Management layer instance @@ -35,6 +36,7 @@ def __init__(self, dataManager, logger, rucioAccount, rucioClient, dryRun=False) :param dryRun: dry-run mode of operations """ self.mgr = dataManager + self.monitManager = monitManager self.logger = logger self.rucioAccount = rucioAccount self.rucioClient = rucioClient @@ -88,6 +90,38 @@ def cleanupTask(self, cleanupDaysThreshold): deleteDocs += 1 self.logger.info("Cleanup task deleted %d pileup objects", deleteDocs) + def cmsMonitTask(self): + """ + Execute CMS MONIT task according to the following logic: + + 1. Read all pileup document from MongoDB + 2. Flatten all docs + 3. Submit flatten docs to CMS MONIT + """ + if not self.monitManager.userAMQ or not self.monitManager.passAMQ: + self.logger.info("MSPileupMonitoring has no AMQ credentials, will skip the upload to MONIT") + return + startTime = time.time() + spec = {} + msPileupDocs = self.mgr.getPileup(spec) + docs = [] + for doc in msPileupDocs: + for flatDoc in flatDocuments(doc): + docs.append(flatDoc) + results = self.monitManager.uploadToAMQ(docs) + endTime = time.time() + elapsedTime = endTime - startTime + if results and isinstance(results, dict): + success = results['success'] + failures = results['failures'] + msg = f"MSPileup CMS MONIT task fetched {len(msPileupDocs)} docs from MSPileup backend DB" + msg += f", and sent {len(docs)} flatten docs to MONIT" + msg += f", number of success docs {success} and failures {failures}," + msg += " in %.2f secs" % elapsedTime + self.logger.info(msg) + else: + self.logger.error("MSPileup CMS MONIT task failed, execution time %.2f secs", elapsedTime) + def monitoringTask(self): """ Execute Monitoring task according to the following logic: diff --git a/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupMonitoring_t.py b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupMonitoring_t.py new file mode 100644 index 0000000000..8adf82b8e1 --- /dev/null +++ b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupMonitoring_t.py @@ -0,0 +1,60 @@ +""" +Unit tests for MSPileupMonitoring.py module + +Author: Valentin Kuznetsov +""" + +# system modules +import unittest + +# WMCore modules +from WMCore.MicroService.MSPileup.MSPileupMonitoring import flatDocuments, flatKey + + +class MSPileupMonitoringTest(unittest.TestCase): + "Unit test for MSPileupMonitoring module" + + def setUp(self): + rses = ['rse1', 'rse2'] + campaigns = ['c1', 'c2'] + ruleIds = ['1', '2'] + self.doc = { + 'pileupName': '/klsjdfklsd/klsjdflksdj/PREMIX', + 'pileupType': 'classic', + 'expectedRSEs': rses, + 'currentRSEs': rses, + 'fullReplicas': 1, + 'campaigns': campaigns, + 'containerFraction': 0.0, + 'replicationGrouping': "ALL", + 'active': True, + 'pileupSize': 0, + 'ruleIds': ruleIds} + + def testFlatKey(self): + "test flatKey functions" + doc = dict(self.doc) + docs = list(flatKey(doc, 'campaigns')) + self.assertEqual(len(docs), 2) + key = 'campaigns' + nkey = key[:-1] # new single key, e.g. campaigns -> campaign + self.assertEqual(key in docs[0], False) + self.assertEqual(docs[0][nkey], self.doc[key][0]) + + def testFlatDocuments(self): + "test flatDocuments function" + doc = dict(self.doc) + docs = list(flatDocuments(doc)) + self.assertEqual(len(docs), 8) + listKeys = ['campaigns', 'currentRSEs', 'expectedRSEs'] + for doc in docs: + for key in listKeys: + vals = self.doc[key] # original key vaules + nkey = key[:-1] # key without s, e.g. campaigns -> campaign + self.assertEqual(key in doc, False) # original key should be gone + val = doc[nkey] # new value for single key + self.assertEqual(val in vals, True) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupTasks_t.py b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupTasks_t.py index 8cc0889f18..4085b77b47 100644 --- a/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupTasks_t.py +++ b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupTasks_t.py @@ -17,6 +17,7 @@ from WMQuality.Emulators.EmulatedUnitTestCase import EmulatedUnitTestCase from WMCore.MicroService.MSPileup.MSPileupTasks import MSPileupTasks from WMCore.MicroService.MSPileup.MSPileupData import MSPileupData +from WMCore.MicroService.MSPileup.MSPileupMonitoring import MSPileupMonitoring from WMCore.MicroService.Tools.Common import getMSLogger from WMCore.Services.Rucio.Rucio import Rucio @@ -113,6 +114,7 @@ def setUp(self): 'mongoDBPassword': None, 'mockMongoDB': True} self.mgr = MSPileupData(msConfig, skipRucio=True) + self.monMgr = MSPileupMonitoring(msConfig) # setup our pileup data self.pname = '/primary/processed/PREMIX' @@ -147,7 +149,7 @@ def testMSPileupTasks(self): """ self.logger.info("---------- CHECK for state=OK -----------") - obj = MSPileupTasks(self.mgr, self.logger, self.rucioAccount, self.rucioClient) + obj = MSPileupTasks(self.mgr, self.monMgr, self.logger, self.rucioAccount, self.rucioClient) obj.monitoringTask() # we added three pileup documents and should have update at least one of them @@ -180,7 +182,7 @@ def testMSPileupTasks(self): # now we can test non OK state in Rucio self.logger.info("---------- CHECK for state=STUCK -----------") self.rucioClient = TestRucioClient(logger=self.logger, state='STUCK') - obj = MSPileupTasks(self.mgr, self.logger, self.rucioAccount, self.rucioClient) + obj = MSPileupTasks(self.mgr, self.monMgr, self.logger, self.rucioAccount, self.rucioClient) obj.monitoringTask() # at this step the T2_XX_CERN should NOT be added to currentRSEs spec = {'pileupName': self.pname} @@ -197,7 +199,7 @@ def testMSPileupTasks(self): # we use CustomException for state to check how our code will # handle Rucio API exceptions self.rucioClient = TestRucioClient(logger=self.logger, state='CustomException') - obj = MSPileupTasks(self.mgr, self.logger, self.rucioAccount, self.rucioClient) + obj = MSPileupTasks(self.mgr, self.monMgr, self.logger, self.rucioAccount, self.rucioClient) obj.monitoringTask() def testMSPileupTasksWithMockApi(self): @@ -214,7 +216,7 @@ def testMSPileupTasksWithMockApi(self): # now create mock rucio client rucioClient = MockRucioApi(self.rucioAccount, hostUrl=self.hostUrl, authUrl=self.authUrl) - obj = MSPileupTasks(self.mgr, self.logger, self.rucioAccount, rucioClient) + obj = MSPileupTasks(self.mgr, self.monMgr, self.logger, self.rucioAccount, rucioClient) obj.monitoringTask() obj.activeTask() obj.inactiveTask()