Skip to content

Commit

Permalink
Merge pull request #12123 from vkuznet/fix-issue-12039
Browse files Browse the repository at this point in the history
Module to update site lists for WMAgents
  • Loading branch information
amaltaro authored Dec 5, 2024
2 parents c4a3916 + aea8c96 commit beefc74
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 3 deletions.
152 changes: 152 additions & 0 deletions src/python/WMComponent/WorkflowUpdater/SiteListPoller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
#!/usr/bin/env python
"""
File : SiteListPoller
Author : Valentin Kuznetsov <vkuznet AT gmail dot com>
Description: module to update of site lists within a WMAgent
"""

# system modules
import logging
import threading

# WMCore modules
from Utils.Timers import timeFunction
from WMCore.DAOFactory import DAOFactory
from WMCore.Services.WorkQueue.WorkQueue import WorkQueue
from WMCore.Services.WMStatsServer.WMStatsServer import WMStatsServer
from WMCore.WMSpec.WMWorkload import WMWorkloadHelper
from WMCore.WorkerThreads.BaseWorkerThread import BaseWorkerThread


class SiteListPoller(BaseWorkerThread):
def __init__(self, config):
"""
Initialize SiteListPoller object
:param config: a Configuration object with the component configuration
"""
BaseWorkerThread.__init__(self)
myThread = threading.currentThread()
self.logger = myThread.logger

# get wmstats parameters
self.wmstatsUrl = getattr(config.WorkflowUpdater, "wmstatsUrl")
self.wmstatsSrv = WMStatsServer(self.wmstatsUrl, logger=self.logger)
self.states = getattr(config.WorkflowUpdater, "states", ['running-open', 'acquired'])

# provide access to WMBS in local WMAgent
self.daoFactory = DAOFactory(package="WMCore.WMBS",
logger=myThread.logger,
dbinterface=myThread.dbi)
# DB function to retrieve active workflows
self.listActiveWflows = self.daoFactory(classname="Workflow.GetUnfinishedWorkflows")

# local WorkQueue service
self.localCouchUrl = config.WorkQueueManager.couchurl
self.localWQ = WorkQueue(self.localCouchUrl,
config.WorkQueueManager.dbname)

def getActiveWorkflows(self):
"""
Provide list of active requests within WMAgent
:return: dict of workflows names vs pickle files
"""
# get list of active workflows in WMAgent
wflowSpecs = self.listActiveWflows.execute()

# construct dictionary of workflow names and their pickle files
wmaSpecs = {}
for wflowSpec in wflowSpecs:
name = wflowSpec['name'] # this is the name of workflow
pklFileName = wflowSpec['spec'] # the "spec" in WMBS table (wmbs_workflow.spec) is pkl file name
wmaSpecs[name] = pklFileName
return wmaSpecs

def wmstatsDict(self, requests):
"""
Return list of requests specs from WMStats for provided list of request names
:param requests: list of workflow requests names
:return: dict of workflow records obtained from wmstats server:
{"wflow": {"SiteWhitelist":[], "SiteBlacklist": []}, ...}
"""
# get list of workflows from wmstats
outputMask = ['SiteWhiteList', 'SiteBlackList']
wdict = {}
for state in self.states:
inputConditions = {"RequestStatus": state}
self.logger.info("Fetch site info from WMStats for condition: %s and mask %s", inputConditions, outputMask)
data = self.wmstatsSrv.getFilteredActiveData(inputConditions, outputMask)
for rdict in data:
# rdict here has the following structure:
# {"RequestName": "bla", "SiteWhitelist":[], "SiteBlacklist": []}
wflow = rdict.pop('RequestName')
# check that our workflow is in our requests list
if wflow in requests:
wdict[wflow] = rdict
return wdict

@timeFunction
def algorithm(self, parameters=None):
"""
Perform the following logic:
- obtain list of current active workflows from the agent
- requests their specs from upstream wmstats server
- update site lists of active workflows
- push new specs to the agent local WorkQueue and update pickle spec file
:return: none
"""
# get list of active workflows from the agent, the returned dict
# is composed by workflow names and associated pickle file (data comes from WMBS)
wmaSpecs = self.getActiveWorkflows()
wflows = wmaSpecs.keys()

# obtain workflow records from wmstats server
wdict = self.wmstatsDict(wflows)

# iterate over the list of active workflows which is smaller than list from wmstats
for wflow in wflows:
if wflow not in wdict.keys():
continue
siteWhiteList = wdict[wflow]['SiteWhitelist']
siteBlackList = wdict[wflow]['SiteBlacklist']

# get the name of pkl file from wma spec
pklFileName = wmaSpecs[wflow]

# create wrapper helper and load pickle file
wHelper = WMWorkloadHelper()
wHelper.load(pklFileName)

# extract from pickle spec both white and black site lists and compare them
# to one we received from upstream service (ReqMgr2)
wmaWhiteList = wHelper.getSiteWhitelist()
wmaBlackList = wHelper.getSiteBlacklist()
if set(wmaWhiteList) != set(siteWhiteList) or set(wmaBlackList) != set(siteBlackList):
self.logger.info("Updating %s:", wflow)
self.logger.info(" siteWhiteList %s => %s", wmaWhiteList, siteWhiteList)
self.logger.info(" siteBlackList %s => %s", wmaBlackList, siteBlackList)
try:
# update local WorkQueue first
self.localWQ.updateSiteLists(wflow, siteWhiteList, siteBlackList)
self.logger.info("successfully updated workqueue elements for workflow %s", wflow)
except Exception as ex:
logging.exception("Unexpected exception while updating elements in local workqueue Details:\n%s", str(ex))
continue

# update workload only if we updated local WorkQueue
# update site white/black lists together
if set(wmaWhiteList) != set(siteWhiteList):
self.logger.info("updating site white list for workflow %s", wflow)
wHelper.setWhitelist(siteWhiteList)
if set(wmaBlackList) != set(siteBlackList):
self.logger.info("updating site black list for workflow %s", wflow)
wHelper.setBlacklist(siteBlackList)

try:
# persist the spec in local CouchDB
self.logger.info("Updating %s with new site lists within pkl file %s", wflow, pklFileName)
# save back pickle file
wHelper.save(pklFileName)
except Exception as ex:
logging.exception("Caught unexpected exception in SiteListPoller. Details:\n%s", str(ex))
continue
5 changes: 5 additions & 0 deletions src/python/WMComponent/WorkflowUpdater/WorkflowUpdater.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pprint import pformat

from WMComponent.WorkflowUpdater.WorkflowUpdaterPoller import WorkflowUpdaterPoller
from WMComponent.WorkflowUpdater.SiteListPoller import SiteListPoller
from WMCore.Agent.Harness import Harness


Expand Down Expand Up @@ -35,3 +36,7 @@ def preInitialization(self):
myThread = threading.currentThread()
myThread.workerThreadManager.addWorker(WorkflowUpdaterPoller(self.config),
pollInterval)

myThread = threading.currentThread()
myThread.workerThreadManager.addWorker(SiteListPoller(self.config),
pollInterval)
32 changes: 29 additions & 3 deletions src/python/WMCore/Services/WorkQueue/WorkQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def updateElements(self, *elementIds, **updatedParams):
eleParams[self.eleKey] = updatedParams
conflictIDs = self.db.updateBulkDocumentsWithConflictHandle(elementIds, eleParams, maxConflictLimit=20)
if conflictIDs:
raise CouchConflictError("WQ update failed with conflict", data=updatedParams, result=conflictIDs)
raise CouchConflictError("WQ update failed with conflict", data=updatedParams, result=conflictIDs, status=409)
return

def getAvailableWorkflows(self):
Expand Down Expand Up @@ -237,6 +237,34 @@ def cancelWorkflow(self, wf):
elements = [x['id'] for x in data.get('rows', []) if x['key'][1] not in nonCancelableElements]
return self.updateElements(*elements, Status='CancelRequested')

def updateSiteLists(self, wf, siteWhiteList=None, siteBlackList=None):
"""
Update site list parameters in elements matching a given workflow and a list of element statuse
:param wf: workflow name
:param siteWhiteList: optional list of strings, new site white list
:param siteBlackList: optional list of strings, new site black list
:return: None
"""
# Update elements in Available status
data = self.db.loadView('WorkQueue', 'jobStatusByRequest',
{'reduce': False})
states = ['Available']
elementsToUpdate = [x['id'] for x in data.get('rows', []) if x['key'][-1] in states and wf in x['key']]
if elementsToUpdate:
self.logger.info("Updating %d elements in status %s for workflow %s", len(elementsToUpdate), states, wf)
self.updateElements(*elementsToUpdate, SiteWhiteList=siteWhiteList, SiteBlackList=siteBlackList)
# Update the spec, if it exists
if self.db.documentExists(wf):
wmspec = WMWorkloadHelper()
# update local workqueue couchDB
wmspec.load(self.hostWithAuth + "/%s/%s/spec" % (self.db.name, wf))
wmspec.setSiteWhiteList(siteWhiteList)
wmspec.setSiteBlackList(siteBlackList)
dummy_values = {'name': wmspec.name()}
wmspec.saveCouch(self.hostWithAuth, self.db.name, dummy_values)
return

def updatePriority(self, wf, priority):
"""Update priority of a workflow, this implies
updating the spec and the priority of the Available elements"""
Expand Down Expand Up @@ -307,7 +335,6 @@ def getElementsCountAndJobsByWorkflow(self, inboxFlag=False, stale=True):
'Jobs': x['value']['sum']}
return result


def _retrieveWorkflowStatus(self, data):
workflowsStatus = {}

Expand All @@ -318,7 +345,6 @@ def _retrieveWorkflowStatus(self, data):
workflowsStatus[workflow] = status
return workflowsStatus


def getWorkflowStatusFromWQE(self, stale=True):
"""
only checks workqueue db not inbox db.
Expand Down
26 changes: 26 additions & 0 deletions src/python/WMCore/WMSpec/WMWorkload.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,19 @@ def removeTask(self, taskName):
self.data.tasks.tasklist.remove(taskName)
return

def getSiteWhitelist(self):
"""
Get the site white list for the workflow
:return: site white list
"""
# loop over tasks to see if there is white lists
taskIterator = self.taskIterator()
siteList = []
for task in taskIterator:
for site in task.siteWhitelist():
siteList.append(site)
return list(set(siteList))

def setSiteWhitelist(self, siteWhitelist):
"""
_setSiteWhitelist_
Expand All @@ -689,6 +702,19 @@ def setSiteWhitelist(self, siteWhitelist):

return

def getSiteBlacklist(self):
"""
Get the site black list for the workflow
:return: site black list
"""
# loop over tasks to see if there is black lists
taskIterator = self.getAllTasks(cpuOnly=False)
siteList = []
for task in taskIterator:
for site in task.siteBlacklist():
siteList.append(site)
return list(set(siteList))

def setSiteBlacklist(self, siteBlacklist):
"""
_setSiteBlacklist_
Expand Down
31 changes: 31 additions & 0 deletions test/python/WMCore_t/WMSpec_t/WMWorkload_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,37 @@ def testDbsUrl(self):
self.assertEqual(url, "https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader")
return

def testGetSiteWhitelist(self):
"""
Test getSiteWhitelist and getSiteBlackList functionality of the task.
"""
testWorkload = WMWorkloadHelper(WMWorkload("TestWorkload"))

procTestTask = testWorkload.newTask("ProcessingTask")
procTestTaskCMSSW = procTestTask.makeStep("cmsRun1")
procTestTaskCMSSW.setStepType("CMSSW")

procTestTask.addInputDataset(name="/PrimaryDataset/ProcessedDataset/DATATIER",
primary="PrimaryDataset",
processed="ProcessedDataset",
tier="DATATIER",
block_whitelist=["Block1", "Block2"],
black_blacklist=["Block3"],
run_whitelist=[1, 2],
run_blacklist=[3])

newSiteWhiteList = ["T1_US_FNAL", "T0_CH_CERN"]
newSiteBlackList = ["T1_DE_KIT"]
testWorkload.setSiteWhitelist(newSiteWhiteList)
testWorkload.setSiteBlacklist(newSiteBlackList)

siteWhiteList = testWorkload.getSiteWhitelist()
siteBlackList = testWorkload.getSiteBlacklist()
self.assertTrue(set(newSiteWhiteList) == set(siteWhiteList),
"Error: Site white list mismatch")
self.assertTrue(set(newSiteBlackList) == set(siteBlackList),
"Error: Site black list mismatch")

def testWhiteBlacklists(self):
"""
_testWhiteBlacklists_
Expand Down

0 comments on commit beefc74

Please sign in to comment.