Skip to content

Commit

Permalink
Fix Sitelists changes propagation to local workqueue
Browse files Browse the repository at this point in the history
Enlarge WQE status mask && Add getWQElementsByWorkflow method && Shorten wowrflow status mask

Reduce WQE structure returned by getWQElementsByWorkflow

Reduce WQE status mask && Include docs for elementsByWorkflow couch view
  • Loading branch information
todor-ivanov committed Feb 4, 2025
1 parent d48389c commit 169f5d0
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 16 deletions.
Empty file modified bin/wmagent-component-standalone
100644 → 100755
Empty file.
24 changes: 9 additions & 15 deletions src/python/WMComponent/WorkflowUpdater/SiteListPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def wmstatsDict(self, requests):
{"wflow": {"SiteWhitelist":[], "SiteBlacklist": []}, ...}
"""
# get list of workflows from wmstats
outputMask = ['SiteWhiteList', 'SiteBlackList']
outputMask = ['SiteWhitelist', 'SiteBlacklist']
wdict = {}
for state in self.states:
inputConditions = {"RequestStatus": state}
Expand Down Expand Up @@ -113,18 +113,21 @@ def algorithm(self, parameters=None):
# get the name of pkl file from wma spec
pklFileName = wmaSpecs[wflow]

# create wrapper helper and load pickle file
# get the local Workqueue url for the workflow's spec
specUrl = self.localWQ.hostWithAuth + "/%s/%s/spec" % (self.localWQ.db.name, wflow)

# create wrapper helper and load the spec from local couch
wHelper = WMWorkloadHelper()
wHelper.load(pklFileName)
wHelper.load(specUrl)

# extract from pickle spec both white and black site lists and compare them
# to one we received from upstream service (ReqMgr2)
wmaWhiteList = wHelper.getSiteWhitelist()
wmaBlackList = wHelper.getSiteBlacklist()
if set(wmaWhiteList) != set(siteWhiteList) or set(wmaBlackList) != set(siteBlackList):
self.logger.info("Updating %s:", wflow)
self.logger.info(" siteWhiteList %s => %s", wmaWhiteList, siteWhiteList)
self.logger.info(" siteBlackList %s => %s", wmaBlackList, siteBlackList)
self.logger.info(" siteWhitelist %s => %s", wmaWhiteList, siteWhiteList)
self.logger.info(" siteBlacklist %s => %s", wmaBlackList, siteBlackList)
try:
# update local WorkQueue first
params = {'SiteWhitelist': siteWhiteList, 'SiteBlacklist': siteBlackList}
Expand All @@ -134,17 +137,8 @@ def algorithm(self, parameters=None):
logging.exception("Unexpected exception while updating elements in local workqueue Details:\n%s", str(ex))
continue

# update workload only if we updated local WorkQueue
# update site white/black lists together
if set(wmaWhiteList) != set(siteWhiteList):
self.logger.info("updating site white list for workflow %s", wflow)
wHelper.setWhitelist(siteWhiteList)
if set(wmaBlackList) != set(siteBlackList):
self.logger.info("updating site black list for workflow %s", wflow)
wHelper.setBlacklist(siteBlackList)

try:
# persist the spec in local CouchDB
# persist the change at the pkl file
self.logger.info("Updating %s with new site lists within pkl file %s", wflow, pklFileName)
# save back pickle file
wHelper.save(pklFileName)
Expand Down
29 changes: 28 additions & 1 deletion src/python/WMCore/Services/WorkQueue/WorkQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ def updateElementsByWorkflow(self, workload, updateParams, status=None):
# Update all workload parameters based on the full reqArgs dictionary
workload.updateWorkloadArgs(updateParams)
# Commit the changes of the current workload object to the database:
workload.saveCouchUrl(workload.specUrl())
metadata = {'name': wfName}
workload.saveCouch(self.hostWithAuth, self.db.name, metadata=metadata)
return

def getWorkflowNames(self, inboxFlag=False):
Expand Down Expand Up @@ -325,6 +326,32 @@ def deleteWQElementsByWorkflow(self, workflowNames):
deleted += len(ids)
return deleted

def getWQElementsByWorkflow(self, workflowNames, inboxFlag=False):
"""
Get workqueue elements which belongs to a given workflow name(s)
:param workflowNames: The workflow name for which we try to fetch the WQE elemtns for (could be a list of names as well)
:param inboxFlag: A flag to switch quering the inboxDB as well (default: False)
:return: A list of WQE
"""
if inboxFlag:
couchdb = self.inboxDB
else:
couchdb = self.db

if not isinstance(workflowNames, list):
workflowNames = [workflowNames]

options = {}
options["stale"] = "ok"
options["reduce"] = False
options['include_docs'] = True

data = couchdb.loadView("WorkQueue", "elementsByWorkflow", options, workflowNames)
wqeList=[]
for wqe in data['rows']:
wqeList.append(wqe['doc'])
return wqeList

def getElementsCountAndJobsByWorkflow(self, inboxFlag=False, stale=True):
"""Get the number of elements and jobs by status and workflow"""
if inboxFlag:
Expand Down

0 comments on commit 169f5d0

Please sign in to comment.