Skip to content

Commit

Permalink
Enlarge WQE status mask && Add getWQElementsByWorkflow method && Shor…
Browse files Browse the repository at this point in the history
…ten wowrflow status mask
  • Loading branch information
todor-ivanov committed Jan 29, 2025
1 parent 6c42ce4 commit 1e9398e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 2 deletions.
Empty file modified bin/wmagent-component-standalone
100644 → 100755
Empty file.
4 changes: 2 additions & 2 deletions src/python/WMComponent/WorkflowUpdater/SiteListPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self, config):
# get wmstats parameters
self.wmstatsUrl = getattr(config.WorkflowUpdater, "wmstatsUrl")
self.wmstatsSrv = WMStatsServer(self.wmstatsUrl, logger=self.logger)
self.states = getattr(config.WorkflowUpdater, "states", ['running-closed', 'running-open', 'acquired'])
self.states = getattr(config.WorkflowUpdater, "states", ['running-open', 'acquired'])

# provide access to WMBS in local WMAgent
self.daoFactory = DAOFactory(package="WMCore.WMBS",
Expand Down Expand Up @@ -131,7 +131,7 @@ def algorithm(self, parameters=None):
try:
# update local WorkQueue first
params = {'SiteWhitelist': siteWhiteList, 'SiteBlacklist': siteBlackList}
self.localWQ.updateElementsByWorkflow(wHelper, params, status=['Available'])
self.localWQ.updateElementsByWorkflow(wHelper, params, status=['Available', 'Negotiating', 'Acquired', 'Running'])
self.logger.info("successfully updated workqueue elements for workflow %s", wflow)
except Exception as ex:
logging.exception("Unexpected exception while updating elements in local workqueue Details:\n%s", str(ex))
Expand Down
35 changes: 35 additions & 0 deletions src/python/WMCore/Services/WorkQueue/WorkQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,41 @@ def deleteWQElementsByWorkflow(self, workflowNames):
deleted += len(ids)
return deleted

def getWQElementsByWorkflow(self, workflowNames, inboxFlag=False):
"""
Get workqueue elements which belongs to a given workflow name(s)
:param workflowNames: The workflow name for which we try to fetch the WQE elemtns for (could be a list of names as well)
:param inboxFlag: A flag to switch quering the inboxDB as well (default: False)
:returnt: A dictionary with pairs of the type:
{dbName1: [List of WQEs...],
dbName2: [List of WQEs...]}
"""
if inboxFlag:
couchdb = self.inboxDB
else:
couchdb = self.db

dbName = couchdb.info()['db_name']

if not isinstance(workflowNames, list):
workflowNames = [workflowNames]

options = {}
options["stale"] = "ok"
options["reduce"] = False

wqeList = {}
wqeList[dbName] = []
result = couchdb.loadView("WorkQueue", "elementsByWorkflow", options, workflowNames)

ids = []
for entry in result["rows"]:
ids.append(entry["id"])

for id in ids:
wqeList[dbName].append(couchdb.get(uri=f"/workqueue/_design/WorkQueue/_rewrite/element/{id}" ))
return wqeList

def getElementsCountAndJobsByWorkflow(self, inboxFlag=False, stale=True):
"""Get the number of elements and jobs by status and workflow"""
if inboxFlag:
Expand Down

0 comments on commit 1e9398e

Please sign in to comment.