Skip to content

Commit

Permalink
New MSMonitoring module
Browse files Browse the repository at this point in the history
  • Loading branch information
vkuznet committed Mar 23, 2023
1 parent 570e0a2 commit a72f4c1
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 6 deletions.
109 changes: 109 additions & 0 deletions src/python/WMCore/MicroService/MSPileup/MSPileupMonitoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""
File : MSPileupMonitoring.py
Author : Valentin Kuznetsov <vkuznet AT gmail dot com>
Description: MSPileupMonitoring provides bridge between MSPileup
service and CMS Monitoring infrastructure
"""

# system modules
import time

# WMCore modules
from WMCore.MicroService.Tools.Common import getMSLogger

# CMSMonitoring modules
from CMSMonitoring.StompAMQ7 import StompAMQ7 as StompAMQ


def flatDocuments(doc):
"""
Helper function to flat out MSPileup document
:param doc: input MSPileup document
:return: generator of MSPileup documents flatten from original one
"""
docs = flatKey(doc, 'campaigns')
docs = (f for d in docs for f in flatKey(d, 'currentRSEs'))
docs = (f for d in docs for f in flatKey(d, 'expectedRSEs'))
for doc in docs:
yield doc


def flatKey(doc, key):
"""
Helper function to flat out values of given key in a document
:param doc: input MSPileup document
:param key: document key to use
:return: generator of MSPileup documents flatten from original one and given key
"""
for item in doc[key]:
ndoc = dict(doc)
# convert plural to singular key name, e.g. campaigns -> campaign
nkey = key[:-1]
ndoc[nkey] = item
del ndoc[key]
yield ndoc


class MSPileupMonitoring():
"""
MSPileupMonitoring represents MSPileup monitoring class
"""

def __init__(self, msConfig=None):
"""
Constructor for MSPileupMonitoring
"""
self.userAMQ = msConfig.get('user_amq', None)
self.passAMQ = msConfig.get('pass_amq', None)
self.postToAMQ = msConfig.get('post_to_amq', False)
self.topicAMQ = msConfig.get('topic_amq', None)
self.docTypeAMQ = msConfig.get('doc_type_amg', 'cms-ms-pileup')
self.hostPortAMQ = msConfig.get('host_port_amq', None)
self.producer = msConfig.get('producer', 'cms-ms-pileup')
self.logger = msConfig.get('logger', getMSLogger(False))

def uploadToAMQ(self, docs, producer=None):
"""
_uploadToAMQ_
Sends data to AMQ, which ends up in elastic search.
:param docs: list of documents/dicts to be posted
:param producer: service name that's providing this info
:return: {} or {"success": ndocs, "failures": nfailures}
"""
if not docs:
self.logger.info("There are no documents to send to AMQ")
return {}
if not self.userAMQ or not self.passAMQ:
self.logger.info("MSPileupMonitoring has no AMQ credentials, will skip the upload to MONIT")
return {}

producer = producer or self.producer
ts = int(time.time())
notifications = []

self.logger.debug("Sending %d to AMQ", len(docs))
try:
stompSvc = StompAMQ(username=self.userAMQ,
password=self.passAMQ,
producer=producer,
topic=self.topicAMQ,
validation_schema=None,
host_and_ports=self.hostPortAMQ,
logger=self.logger)

for doc in docs:
singleNotif, _, _ = stompSvc.make_notification(payload=doc, doc_type=self.docTypeAMQ,
ts=ts, data_subfield="payload")
notifications.append(singleNotif)

failures = stompSvc.send(notifications)
msg = "%i out of %i documents successfully sent to AMQ" % (len(notifications) - len(failures),
len(notifications))
self.logger.info(msg)
return {"success": len(notifications)-len(failures), "failures": len(failures)}
except Exception as ex:
self.logger.exception("Failed to send data to StompAMQ. Error %s", str(ex))
return {}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from WMCore.MicroService.DataStructs.DefaultStructs import PILEUP_REPORT
from WMCore.MicroService.MSPileup.MSPileupData import MSPileupData
from WMCore.MicroService.MSPileup.MSPileupTasks import MSPileupTasks
from WMCore.MicroService.MSPileup.MSPileupMonitoring import MSPileupMonitoring


class MSPileupTaskManager(MSCore):
Expand All @@ -37,7 +38,8 @@ def __init__(self, msConfig, **kwargs):
dryRun = msConfig.get('dryRun', False)
self.rucioClient = self.rucio # set in MSCore init
self.dataManager = MSPileupData(msConfig)
self.mgr = MSPileupTasks(self.dataManager, self.logger,
self.monitManager = MSPileupMonitoring(msConfig)
self.mgr = MSPileupTasks(self.dataManager, self.monitManager, self.logger,
self.rucioAccount, self.rucioClient, dryRun)

def status(self):
Expand All @@ -61,3 +63,4 @@ def executeCycle(self):
self.mgr.activeTask(marginSpace=self.marginSpace)
self.mgr.inactiveTask()
self.mgr.cleanupTask(self.cleanupDaysThreshold)
self.mgr.cmsMonitTask()
36 changes: 35 additions & 1 deletion src/python/WMCore/MicroService/MSPileup/MSPileupTasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from WMCore.MicroService.MSPileup.DataStructs.MSPileupReport import MSPileupReport
from WMCore.Services.UUIDLib import makeUUID
from WMCore.MicroService.Tools.PycurlRucio import getPileupContainerSizesRucio, getRucioToken
from WMCore.MicroService.MSPileup.MSPileupMonitoring import flatDocuments


class MSPileupTasks():
Expand All @@ -25,7 +26,7 @@ class MSPileupTasks():
- active task to look-up pileup docs in active state
"""

def __init__(self, dataManager, logger, rucioAccount, rucioClient, dryRun=False):
def __init__(self, dataManager, monitManager, logger, rucioAccount, rucioClient, dryRun=False):
"""
MSPileupTaskManager constructor
:param dataManager: MSPileup Data Management layer instance
Expand All @@ -35,6 +36,7 @@ def __init__(self, dataManager, logger, rucioAccount, rucioClient, dryRun=False)
:param dryRun: dry-run mode of operations
"""
self.mgr = dataManager
self.monitManager = monitManager
self.logger = logger
self.rucioAccount = rucioAccount
self.rucioClient = rucioClient
Expand Down Expand Up @@ -88,6 +90,38 @@ def cleanupTask(self, cleanupDaysThreshold):
deleteDocs += 1
self.logger.info("Cleanup task deleted %d pileup objects", deleteDocs)

def cmsMonitTask(self):
"""
Execute CMS MONIT task according to the following logic:
1. Read all pileup document from MongoDB
2. Flatten all docs
3. Submit flatten docs to CMS MONIT
"""
if not self.monitManager.userAMQ or not self.monitManager.passAMQ:
self.logger.info("MSPileupMonitoring has no AMQ credentials, will skip the upload to MONIT")
return
startTime = time.time()
spec = {}
msPileupDocs = self.mgr.getPileup(spec)
docs = []
for doc in msPileupDocs:
for flatDoc in flatDocuments(doc):
docs.append(flatDoc)
results = self.monitManager.uploadToAMQ(docs)
endTime = time.time()
elapsedTime = endTime - startTime
if results and isinstance(results, dict):
success = results['success']
failures = results['failures']
msg = f"MSPileup CMS MONIT task fetched {len(msPileupDocs)} docs from MSPileup backend DB"
msg += f", and sent {len(docs)} flatten docs to MONIT"
msg += f", number of success docs {success} and failures {failures},"
msg += " in %.2f secs" % elapsedTime
self.logger.info(msg)
else:
self.logger.error("MSPileup CMS MONIT task failed, execution time %.2f secs", elapsedTime)

def monitoringTask(self):
"""
Execute Monitoring task according to the following logic:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
Unit tests for MSPileupMonitoring.py module
Author: Valentin Kuznetsov <vkuznet [AT] gmail [DOT] com>
"""

# system modules
import unittest

# WMCore modules
from WMCore.MicroService.MSPileup.MSPileupMonitoring import flatDocuments, flatKey


class MSPileupMonitoringTest(unittest.TestCase):
"Unit test for MSPileupMonitoring module"

def setUp(self):
rses = ['rse1', 'rse2']
campaigns = ['c1', 'c2']
ruleIds = ['1', '2']
self.doc = {
'pileupName': '/klsjdfklsd/klsjdflksdj/PREMIX',
'pileupType': 'classic',
'expectedRSEs': rses,
'currentRSEs': rses,
'fullReplicas': 1,
'campaigns': campaigns,
'containerFraction': 0.0,
'replicationGrouping': "ALL",
'active': True,
'pileupSize': 0,
'ruleIds': ruleIds}

def testFlatKey(self):
"test flatKey functions"
doc = dict(self.doc)
docs = list(flatKey(doc, 'campaigns'))
self.assertEqual(len(docs), 2)
key = 'campaigns'
nkey = key[:-1] # new single key, e.g. campaigns -> campaign
self.assertEqual(key in docs[0], False)
self.assertEqual(docs[0][nkey], self.doc[key][0])

def testFlatDocuments(self):
"test flatDocuments function"
doc = dict(self.doc)
docs = list(flatDocuments(doc))
self.assertEqual(len(docs), 8)
listKeys = ['campaigns', 'currentRSEs', 'expectedRSEs']
for doc in docs:
for key in listKeys:
vals = self.doc[key] # original key vaules
nkey = key[:-1] # key without s, e.g. campaigns -> campaign
self.assertEqual(key in doc, False) # original key should be gone
val = doc[nkey] # new value for single key
self.assertEqual(val in vals, True)


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from WMQuality.Emulators.EmulatedUnitTestCase import EmulatedUnitTestCase
from WMCore.MicroService.MSPileup.MSPileupTasks import MSPileupTasks
from WMCore.MicroService.MSPileup.MSPileupData import MSPileupData
from WMCore.MicroService.MSPileup.MSPileupMonitoring import MSPileupMonitoring
from WMCore.MicroService.Tools.Common import getMSLogger
from WMCore.Services.Rucio.Rucio import Rucio

Expand Down Expand Up @@ -113,6 +114,7 @@ def setUp(self):
'mongoDBPassword': None,
'mockMongoDB': True}
self.mgr = MSPileupData(msConfig, skipRucio=True)
self.monMgr = MSPileupMonitoring(msConfig)

# setup our pileup data
self.pname = '/primary/processed/PREMIX'
Expand Down Expand Up @@ -147,7 +149,7 @@ def testMSPileupTasks(self):
"""
self.logger.info("---------- CHECK for state=OK -----------")

obj = MSPileupTasks(self.mgr, self.logger, self.rucioAccount, self.rucioClient)
obj = MSPileupTasks(self.mgr, self.monMgr, self.logger, self.rucioAccount, self.rucioClient)
obj.monitoringTask()

# we added three pileup documents and should have update at least one of them
Expand Down Expand Up @@ -180,7 +182,7 @@ def testMSPileupTasks(self):
# now we can test non OK state in Rucio
self.logger.info("---------- CHECK for state=STUCK -----------")
self.rucioClient = TestRucioClient(logger=self.logger, state='STUCK')
obj = MSPileupTasks(self.mgr, self.logger, self.rucioAccount, self.rucioClient)
obj = MSPileupTasks(self.mgr, self.monMgr, self.logger, self.rucioAccount, self.rucioClient)
obj.monitoringTask()
# at this step the T2_XX_CERN should NOT be added to currentRSEs
spec = {'pileupName': self.pname}
Expand All @@ -197,7 +199,7 @@ def testMSPileupTasks(self):
# we use CustomException for state to check how our code will
# handle Rucio API exceptions
self.rucioClient = TestRucioClient(logger=self.logger, state='CustomException')
obj = MSPileupTasks(self.mgr, self.logger, self.rucioAccount, self.rucioClient)
obj = MSPileupTasks(self.mgr, self.monMgr, self.logger, self.rucioAccount, self.rucioClient)
obj.monitoringTask()

def testMSPileupTasksWithMockApi(self):
Expand All @@ -214,7 +216,7 @@ def testMSPileupTasksWithMockApi(self):

# now create mock rucio client
rucioClient = MockRucioApi(self.rucioAccount, hostUrl=self.hostUrl, authUrl=self.authUrl)
obj = MSPileupTasks(self.mgr, self.logger, self.rucioAccount, rucioClient)
obj = MSPileupTasks(self.mgr, self.monMgr, self.logger, self.rucioAccount, rucioClient)
obj.monitoringTask()
obj.activeTask()
obj.inactiveTask()
Expand Down

0 comments on commit a72f4c1

Please sign in to comment.