Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Sitelists changes propagation to local workqueue #12245

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file modified bin/wmagent-component-standalone
100644 → 100755
Empty file.
24 changes: 9 additions & 15 deletions src/python/WMComponent/WorkflowUpdater/SiteListPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def wmstatsDict(self, requests):
{"wflow": {"SiteWhitelist":[], "SiteBlacklist": []}, ...}
"""
# get list of workflows from wmstats
outputMask = ['SiteWhiteList', 'SiteBlackList']
outputMask = ['SiteWhitelist', 'SiteBlacklist']
wdict = {}
for state in self.states:
inputConditions = {"RequestStatus": state}
Expand Down Expand Up @@ -113,18 +113,21 @@ def algorithm(self, parameters=None):
# get the name of pkl file from wma spec
pklFileName = wmaSpecs[wflow]

# create wrapper helper and load pickle file
# get the local Workqueue url for the workflow's spec
specUrl = self.localWQ.hostWithAuth + "/%s/%s/spec" % (self.localWQ.db.name, wflow)

# create wrapper helper and load the spec from local couch
wHelper = WMWorkloadHelper()
wHelper.load(pklFileName)
wHelper.load(specUrl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we seem to have a mix of workload object from different sources:

  1. pklFileName, which I believe to live in the filesystem (probably under WorkQueueManager/cache)
  2. specUrl from the CouchDB document.

We should pick one of those and use it consistently. This will avoid confusion and we can also make sure that we are comparing the current site lists agains the correct workload spec file, hence avoid unnecessary updates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, to me, it was not understood why should we switch to loading a .pkl file at the first place, since the initial idea and also, if I am not wrong, Valentin's previous implementation of this was by loading the spec from couch. only after the PR for swapping the functions to use mine updatWorkloadArgs, it was changed to loading the spec from pkl without any explanation why. And as of consistency - the same mechanism happens at central services and we do it quite safely from the database. I do not see a reason why to change this approach here at the Agent.

Something more - tha actual reason indeed is because we are not agnostic to the the way how the workload object is created.

  • If we load from couchDB: then the method workload.specUrl() returns:
'http://127.0.0.1:5984/workqueue/dmapelli_ReReco_RunBlockWhite_Nvidia_test_v1_250124_095017_1803/spec
  • if we load from .pkl: then the method workload.specUrl() returns:
/data/srv/wmagent/2.3.8/install/WorkQueueManager/cache/dmapelli_ReReco_RunBlockWhite_Nvidia_test_v1_250124_095017_1803/WMSandbox/WMWorkload.pkl

Copy link
Contributor Author

@todor-ivanov todor-ivanov Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW we may need to investigate on that , because on the other hand I could not reproduce the behaviour just a minute ago, which was strange indeed. but in any case we'd better stay consistent acros Central services and WMAgent and stick to equivalent methods for obtaining the wrkflow spec, which would be guarantied to always produce the spec URI related to the couchDB rather then the filesystem.


# extract from pickle spec both white and black site lists and compare them
# to one we received from upstream service (ReqMgr2)
wmaWhiteList = wHelper.getSiteWhitelist()
wmaBlackList = wHelper.getSiteBlacklist()
if set(wmaWhiteList) != set(siteWhiteList) or set(wmaBlackList) != set(siteBlackList):
self.logger.info("Updating %s:", wflow)
self.logger.info(" siteWhiteList %s => %s", wmaWhiteList, siteWhiteList)
self.logger.info(" siteBlackList %s => %s", wmaBlackList, siteBlackList)
self.logger.info(" siteWhitelist %s => %s", wmaWhiteList, siteWhiteList)
self.logger.info(" siteBlacklist %s => %s", wmaBlackList, siteBlackList)
try:
# update local WorkQueue first
params = {'SiteWhitelist': siteWhiteList, 'SiteBlacklist': siteBlackList}
Expand All @@ -134,17 +137,8 @@ def algorithm(self, parameters=None):
logging.exception("Unexpected exception while updating elements in local workqueue Details:\n%s", str(ex))
continue

# update workload only if we updated local WorkQueue
# update site white/black lists together
if set(wmaWhiteList) != set(siteWhiteList):
self.logger.info("updating site white list for workflow %s", wflow)
wHelper.setWhitelist(siteWhiteList)
if set(wmaBlackList) != set(siteBlackList):
self.logger.info("updating site black list for workflow %s", wflow)
wHelper.setBlacklist(siteBlackList)

try:
# persist the spec in local CouchDB
# persist the change at the pkl file
self.logger.info("Updating %s with new site lists within pkl file %s", wflow, pklFileName)
# save back pickle file
wHelper.save(pklFileName)
Expand Down
30 changes: 29 additions & 1 deletion src/python/WMCore/Services/WorkQueue/WorkQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from WMCore.Lexicon import splitCouchServiceURL
from WMCore.WMSpec.WMWorkload import WMWorkloadHelper
from WMCore.WorkQueue.DataStructs.WorkQueueElement import STATES
from WMCore.WorkQueue.DataStructs.CouchWorkQueueElement import CouchWorkQueueElement


def convertWQElementsStatusToWFStatus(elementsStatusSet):
Expand Down Expand Up @@ -287,7 +288,8 @@ def updateElementsByWorkflow(self, workload, updateParams, status=None):
# Update all workload parameters based on the full reqArgs dictionary
workload.updateWorkloadArgs(updateParams)
# Commit the changes of the current workload object to the database:
workload.saveCouchUrl(workload.specUrl())
metadata = {'name': wfName}
workload.saveCouch(self.hostWithAuth, self.db.name, metadata=metadata)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this method used by global workqueue as well? It might be that making this change for the agent will actually break global workqueue.
In addition, I think it is the first time I see this update being done with metadata (and isn't it redundant with the workload.updateWorkloadArgs?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes (is used by global) and No (won't break anything) and No (it is not redundant)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I see this pattern has already been used in the method that updates the priority:

wmspec.load(self.hostWithAuth + "/%s/%s/spec" % (self.db.name, wf))
wmspec.setPriority(priority)
dummy_values = {'name': wmspec.name()}
wmspec.saveCouch(self.hostWithAuth, self.db.name, dummy_values)

return

def getWorkflowNames(self, inboxFlag=False):
Expand Down Expand Up @@ -325,6 +327,32 @@ def deleteWQElementsByWorkflow(self, workflowNames):
deleted += len(ids)
return deleted

def getWQElementsByWorkflow(self, workflowNames, inboxFlag=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other than the workflow name, can't we have a method that will be very very similar to what is implemented for getWorkflowNames():

def getWorkflowNames(self, inboxFlag=False):

?

This implementation seems to be overloaded, also with the additional data structures created and grouped by database.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed the the data structure returned so it is now just a list of WQE.
And, No we cannot do Just what is done in getWorkflowNames - because it'd return only the WE ids, while we need the full content of the WE - hence the rest of the function, beyond the call to couchDB vew elementsByWorkflow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why you need the full element content.
Can you execute an in-place update as performed by this method:

def updateElements(self, *elementIds, **updatedParams):
(updateElements())?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additionally, if we have unit tests for this module, please create a new one for this new method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not using this method for updating any WQE, but rather for listing its content like this: [1]

In [1]: sitelistpoller.localWQ.getWQElementsByWorkflow('dmapelli_ReReco_RunBlockWhite_Nvidia_test_v1_250124_095017_1803')
Out[1]: 
[{'Inputs': {},
  'ProcessedInputs': [],
  'RejectedInputs': [],
  'PileupData': {},
  'ParentData': {},
  'ParentFlag': False,
  'Jobs': 0,
  'WMSpec': None,
  'SiteWhitelist': [],
  'SiteBlacklist': [],
  'Dbs': None,
  'Task': None,
  'ParentQueueId': None,
  'Priority': 0,
  'SubscriptionId': None,
  'Status': None,
  'EventsWritten': 0,
  'FilesProcessed': 0,
  'PercentComplete': 0,
  'PercentSuccess': 0,
  'RequestName': None,
  'TaskName': None,
  'TeamName': None,
  'StartPolicy': {},
  'EndPolicy': {},
  'ACDC': {},
  'ChildQueueUrl': None,
  'ParentQueueUrl': None,
  'WMBSUrl': None,
  'NumberOfLumis': 0,
  'NumberOfEvents': 0,
  'NumberOfFiles': 0,
  'NumOfFilesAdded': 0,
  'Mask': None,
  'OpenForNewData': False,
  'TimestampFoundNewData': 1738241180,
  'NoInputUpdate': False,
  'NoPileupUpdate': False,
  'CreationTime': 1738241180,
  '_id': '3c36d2561fce25529b81e2045327b944',
  '_rev': '7-90374a9e21a25a0843c0000ce6b29642',
  'thunker_encoded_json': True,
  'type': 'WMCore.WorkQueue.DataStructs.WorkQueueElement.WorkQueueElement',
  'WMCore.WorkQueue.DataStructs.WorkQueueElement.WorkQueueElement': {'Inputs': {'/HLTPhysicsIsolatedBunch/Run2016H-v1/RAW#92049f6e-92f9-11e6-b150-001e67abf228': ['T2_CH_CERN_P5',
     'T2_CH_CERN_HLT',
     'T2_CH_CERN']},
   'ParentFlag': False,
   'ParentData': {},
   'NumberOfLumis': 3,
   'NumberOfFiles': 3,
   'NumberOfEvents': 3008,
   'Jobs': 8,
   'OpenForNewData': False,
   'NoInputUpdate': False,
   'NoPileupUpdate': False,
   'Status': 'Running',
   'RequestName': 'dmapelli_ReReco_RunBlockWhite_Nvidia_test_v1_250124_095017_1803',
   'TaskName': 'DataProcessing',
   'Dbs': 'https://cmsweb-testbed.cern.ch/dbs/int/global/DBSReader',
   'SiteWhitelist': ['T1_DE_KIT'],
   'SiteBlacklist': ['T2_FR_IPHC',
    'T2_GR_Ioannina',
    'T2_HU_Budapest',
    'T2_IN_TIFR',
    'T2_IT_Bari',
    'T2_IT_Legnaro',
    'T2_IT_Pisa',
    'T2_IT_Rome',
    'T2_KR_KISTI'],
   'StartPolicy': 'Block',
   'EndPolicy': {'policyName': 'SingleShot'},
   'Priority': 190003,
   'PileupData': {},
   'ProcessedInputs': [],
   'RejectedInputs': [],
   'ParentQueueId': '3c36d2561fce25529b81e2045327b944',
   'SubscriptionId': 844,
   'EventsWritten': 0,
   'FilesProcessed': 0,
   'PercentComplete': 0,
   'PercentSuccess': 0,
   'TeamName': 'testbed-vocms0192',
   'ACDC': {},
   'ChildQueueUrl': None,
   'ParentQueueUrl': None,
   'WMBSUrl': None,
   'NumOfFilesAdded': 0,
   'Mask': None,
   'TimestampFoundNewData': 1737716401,
   'CreationTime': 1737716401},
  'updatetime': 1737716402.0188694,
  'timestamp': 1737716402.0188694}]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, you are making this process in two operations:

  1. you figure out what are the document ids for a given workflow (regardless of their status)
  2. then for each of the document ids, you make another request to CouchDB to retrieve the actual document.

This operation is very inefficient, instead you can add the following option:

        options['include_docs'] = True

in order to retrieve the actual documents together with the CouchDB view access.

The relevant unit test can go in: https://github.com/dmwm/WMCore/blob/master/test/python/WMCore_t/Services_t/WorkQueue_t/WorkQueue_t.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

"""
Get workqueue elements which belongs to a given workflow name(s)
:param workflowNames: The workflow name for which we try to fetch the WQE elemtns for (could be a list of names as well)
:param inboxFlag: A flag to switch quering the inboxDB as well (default: False)
:return: A list of WQEs
"""
if inboxFlag:
couchdb = self.inboxDB
else:
couchdb = self.db

if not isinstance(workflowNames, list):
workflowNames = [workflowNames]

options = {}
options["stale"] = "ok"
options["reduce"] = False
options['include_docs'] = True

data = couchdb.loadView("WorkQueue", "elementsByWorkflow", options, workflowNames)
wqeList=[]
for wqe in data['rows']:
wqeList.append(CouchWorkQueueElement.fromDocument(couchdb, wqe['doc']))
return wqeList

def getElementsCountAndJobsByWorkflow(self, inboxFlag=False, stale=True):
"""Get the number of elements and jobs by status and workflow"""
if inboxFlag:
Expand Down
14 changes: 14 additions & 0 deletions test/python/WMCore_t/Services_t/WorkQueue_t/WorkQueue_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,20 @@ def testWorkQueueService(self):
self.assertEqual(wqApi.getWMBSUrl(), [])
self.assertEqual(wqApi.getWMBSUrlByRequest(), [])

def testGetWQElementsByWorkflow(self):
specName = "RerecoSpec"
specUrl = self.specGenerator.createReRecoSpec(specName, "file",
assignKwargs={'SiteWhitelist':["T2_XX_SiteA"]})
globalQ = globalQueue(DbName='workqueue_t',
QueueURL=self.testInit.couchUrl,
UnittestFlag=True, **self.queueParams)
globalQ.queueWork(specUrl, "RerecoSpec", "teamA")
wqService = WorkQueueDS(self.testInit.couchUrl, 'workqueue_t')

gqList=globalQ.backend.getElementsForWorkflow(specName)
wqSList=wqService.getWQElementsByWorkflow(specName)
self.assertListEqual(gqList, wqSList)

def testUpdatePriorityService(self):
"""
_testUpdatePriorityService_
Expand Down