Skip to content

Commit

Permalink
Corrected GQ -> LQ data acquisition in slices
Browse files Browse the repository at this point in the history
use correct key names

separate in different methods

fix list integer comparison

make parameters configurable; Todors changes

use new thresholds when fetching data from parent queue
  • Loading branch information
amaltaro committed Jan 30, 2023
1 parent 81c7d43 commit 9e88ab0
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 85 deletions.
4 changes: 4 additions & 0 deletions etc/WMAgentConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@
config.WorkQueueManager.queueParams["QueueURL"] = "http://%s:5984" % (config.Agent.hostName)
config.WorkQueueManager.queueParams["WorkPerCycle"] = 200 # don't pull more than this number of elements per cycle
config.WorkQueueManager.queueParams["QueueDepth"] = 0.5 # pull work from GQ for only half of the resources
# number of available elements to be retrieved within a single CouchDB http request
config.WorkQueueManager.queueParams["RowsPerSlice"] = 2500
# maximum number of available elements rows to be evaluated when acquiring GQ to LQ work
config.WorkQueueManager.queueParams["MaxRowsPerCycle"] = 50000
config.WorkQueueManager.queueParams["rucioAccount"] = "wmcore_transferor" # account for data locks


Expand Down
15 changes: 11 additions & 4 deletions src/python/WMCore/WorkQueue/WorkQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ def __init__(self, logger=None, dbi=None, **params):

self.params.setdefault('QueueDepth', 1) # when less than this locally
self.params.setdefault('WorkPerCycle', 100)
self.params.setdefault('RowsPerSlice', 2500)
self.params.setdefault('MaxRowsPerCycle', 50000)
self.params.setdefault('LocationRefreshInterval', 600)
self.params.setdefault('FullLocationRefreshInterval', 7200)
self.params.setdefault('TrackLocationOrSubscription', 'location')
Expand Down Expand Up @@ -310,13 +312,16 @@ def getWork(self, jobSlots, siteJobCounts, excludeWorkflows=None):
"""
excludeWorkflows = excludeWorkflows or []
results = []
numElems = self.params['WorkPerCycle']
if not self.backend.isAvailable():
self.logger.warning('Backend busy or down: skipping fetching of work')
return results

# TODO AMR: perhaps numElems limit should be removed for LQ -> WMBS acquisition
matches, _ = self.backend.availableWork(jobSlots, siteJobCounts,
excludeWorkflows=excludeWorkflows, numElems=numElems)
excludeWorkflows=excludeWorkflows,
numElems=self.params['WorkPerCycle'],
rowsPerSlice=self.params['RowsPerSlice'],
maxRows=self.params['MaxRowsPerCycle'])

self.logger.info('Got %i elements matching the constraints', len(matches))
if not matches:
Expand Down Expand Up @@ -825,9 +830,11 @@ def freeResouceCheck(self):
return (resources, jobCounts)

def getAvailableWorkfromParent(self, resources, jobCounts, printFlag=False):
numElems = self.params['WorkPerCycle']
self.logger.info("Going to fetch work from the parent queue: %s", self.parent_queue.queueUrl)
work, _ = self.parent_queue.availableWork(resources, jobCounts, self.params['Team'], numElems=numElems)
work, _ = self.parent_queue.availableWork(resources, jobCounts, self.params['Team'],
numElems=self.params['WorkPerCycle'],
rowsPerSlice=self.params['RowsPerSlice'],
maxRows=self.params['MaxRowsPerCycle'])
if not work:
self._printLog('No available work in parent queue.', printFlag, "warning")
return work
Expand Down
189 changes: 108 additions & 81 deletions src/python/WMCore/WorkQueue/WorkQueueBackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

from builtins import object
from math import ceil

from future.utils import viewitems

Expand Down Expand Up @@ -411,8 +412,8 @@ def calculateAvailableWork(self, thresholds, siteJobCounts):
len(elements), self.queueUrl)
return elements, siteJobCounts

def availableWork(self, thresholds, siteJobCounts, team=None,
excludeWorkflows=None, numElems=9999999):
def availableWork(self, thresholds, siteJobCounts, team=None, excludeWorkflows=None,
numElems=1000, rowsPerSlice=1000, maxRows=1000):
"""
Get work - either from local or global queue - which is available to be run.
Expand All @@ -422,117 +423,143 @@ def availableWork(self, thresholds, siteJobCounts, team=None,
is a dictionary with the number of jobs running at a given priority.
:param team: a string with the team name we want to pull work for
:param excludeWorkflows: list of (aborted) workflows that should not be accepted
:param numElems: integer with the maximum number of elements to be accepted (default
to a very large number when pulling work from local queue, read unlimited)
:param numElems: integer with the maximum number of elements to be accepted.
:param rowsPerSlice: integer defining the amount of rows for each slice (slices
of a couchdb view request).
:param maxRows: maximum number of available elements (rows) to be considered
when pulling work down to the agent.
:return: a tuple with the elements accepted and an overview of job counts per site
"""
excludeWorkflows = excludeWorkflows or []
elements = []
acceptedElems = []
# If there are no sites, punt early.
if not thresholds:
self.logger.error("No thresholds is set: Please check")
return elements, siteJobCounts
return acceptedElems, siteJobCounts

self.logger.info("Current siteJobCounts:")
for site, jobsByPrio in viewitems(siteJobCounts):
self.logger.info(" %s : %s", site, jobsByPrio)

# Find out how many elements are in Available status
numAvail = self.queueLength()
self.logger.info("Current amount of WQEs in Available status: %s", numAvail)

self.logger.info("Getting up to %d available work from %s", numElems, self.queueUrl)
self.logger.info(" for team name: %s", team)
self.logger.info(" with excludeWorkflows: %s", excludeWorkflows)
self.logger.info(" with excludeWorkflows count: %s", len(excludeWorkflows))
self.logger.debug(" with excludeWorkflows: %s", excludeWorkflows)
self.logger.info(" for thresholds: %s", thresholds)

# FIXME: magic numbers
docsSliceSize = 1000
options = {}
options['include_docs'] = True
options['descending'] = True
options['resources'] = thresholds
options['limit'] = docsSliceSize
options['limit'] = rowsPerSlice
# FIXME: num_elem option can likely be deprecated, but it needs synchronization
# between agents and global workqueue... for now, make sure it can return the slice size
options['num_elem'] = docsSliceSize
options['num_elem'] = rowsPerSlice
if team:
options['team'] = team

# Fetch workqueue elements in slices, using the CouchDB "limit" and "skip"
# options for couch views. Conditions to stop this loop are:
# a) have a hard stop at 50k+1 (we might have to make this configurable)
# b) stop as soon as an empty slice is returned by Couch (thus all docs have
# already been retrieve)
# c) or, once "numElems" elements have been accepted
numSkip = 0
breakOut = False
while True:
if breakOut:
# then we have reached the maximum number of elements to be accepted
break
self.logger.info(" with limit docs: %s, and skip first %s docs", docsSliceSize, numSkip)
options['skip'] = numSkip
# a) stop once total_rows is reached (exhausted all available elements)
# b) hit maximum allowed elements/rows to be considered for data acquisition (maxRows)
# c) or, once the targeted number of elements has been accepted (numElems)
numSlices = ceil(numAvail / rowsPerSlice)
numSlices = min(numSlices, int(maxRows / rowsPerSlice))
for sliceNum in range(numSlices):
# documents to skip as a function of the slice number
options['skip'] = sliceNum * rowsPerSlice
self.logger.info(" for slice: %s, with rows range [%s - %s]",
sliceNum, options['skip'], options['skip'] + options['limit'])

result = self.db.loadList('WorkQueue', 'workRestrictions', 'availableByPriority', options)
result = json.loads(result)
if result:
self.logger.info("Retrieved %d elements from workRestrictions list for: %s",
len(result), self.queueUrl)
# now check the remaining restrictions and priority
wqeSlots = numElems - len(acceptedElems)
elems = self._evalAvailableWork(json.loads(result), thresholds, siteJobCounts,
excludeWorkflows, wqeSlots)
acceptedElems.extend(elems)
if len(acceptedElems) >= numElems:
msg = f"Reached maximum number of elements to be accepted, "
msg += f"configured to: {numElems}, from queue: {self.queueUrl}"
self.logger.info(msg)
break

self.logger.info("Total of %d elements passed location and siteJobCounts restrictions for: %s",
len(acceptedElems), self.queueUrl)
return acceptedElems, siteJobCounts

def _evalAvailableWork(self, listElems, thresholds, siteJobCounts,
excludeWorkflows, numElems):
"""
Evaluate work available in workqueue and decide whether it can be
accepted or not.
:param listElems: list of dictionaries that correspond to the workqueue elements.
:param thresholds: a dictionary key'ed by the site name, values representing the
maximum number of jobs allowed at that site.
:param siteJobCounts: a dictionary-of-dictionaries key'ed by the site name; value
is a dictionary with the number of jobs running at a given priority.
NOTE that it is updated in place.
:param excludeWorkflows: list of (aborted) workflows that should not be accepted
:param numElems: integer with the maximum number of elements to be accepted (default
to a very large number when pulling work from local queue, read unlimited)
:return: a tuple with the elements accepted and an overview of job counts per site
"""
elems = []
self.logger.info("Retrieved %d elements from workRestrictions list for: %s",
len(listElems), self.queueUrl)
# Convert python dictionary into Couch WQE objects, skipping aborted workflows
# And sort them by creation time and priority, such that highest priority and
# oldest elements come first in the list
sortedElements = []
for i in listElems:
element = CouchWorkQueueElement.fromDocument(self.db, i)
# make sure not to acquire work for aborted or force-completed workflows
if element['RequestName'] in excludeWorkflows:
msg = "Skipping aborted/force-completed workflow: %s, work id: %s"
self.logger.info(msg, element['RequestName'], element._id)
else:
self.logger.info("All the workqueue elements have been exhausted for: %s ", self.queueUrl)
sortedElements.append(element)
sortAvailableElements(sortedElements)

for element in sortedElements:
if numElems <= 0:
# it means we accepted the configured number of elements
break
# update number of documents to skip in the next cycle
numSkip += docsSliceSize

# Convert python dictionary into Couch WQE objects, skipping aborted workflows
# And sort them by creation time and priority, such that highest priority and
# oldest elements come first in the list
sortedElements = []
for i in result:
element = CouchWorkQueueElement.fromDocument(self.db, i)
# make sure not to acquire work for aborted or force-completed workflows
if element['RequestName'] in excludeWorkflows:
msg = "Skipping aborted/force-completed workflow: %s, work id: %s"
self.logger.info(msg, element['RequestName'], element._id)
else:
sortedElements.append(element)
sortAvailableElements(sortedElements)

for element in sortedElements:
if numElems <= 0:
msg = "Reached maximum number of elements to be accepted, "
msg += "configured to: {}, from queue: {}".format(len(elements), self.queueUrl)
self.logger.info(msg)
breakOut = True # get out of the outer loop as well
break
commonSites = possibleSites(element)
prio = element['Priority']
# shuffle list of common sites all the time to give everyone the same chance
random.shuffle(commonSites)
possibleSite = None
for site in commonSites:
if site in thresholds:
# Count the number of jobs currently running of greater priority, if they
# are less than the site thresholds, then accept this element
curJobCount = sum([x[1] if x[0] >= prio else 0 for x in viewitems(siteJobCounts.get(site, {}))])
self.logger.debug(
"Job Count: %s, site: %s thresholds: %s" % (curJobCount, site, thresholds[site]))
if curJobCount < thresholds[site]:
possibleSite = site
break

if possibleSite:
self.logger.info("Accepting workflow: %s, with prio: %s, element id: %s, for site: %s",
element['RequestName'], prio, element.id, possibleSite)
numElems -= 1
elements.append(element)
siteJobCounts.setdefault(possibleSite, {})
siteJobCounts[possibleSite][prio] = siteJobCounts[possibleSite].setdefault(prio, 0) + \
element['Jobs'] * element.get('blowupFactor', 1.0)
else:
self.logger.debug("No available resources for %s with doc id %s",
element['RequestName'], element.id)
commonSites = possibleSites(element)
prio = element['Priority']
# shuffle list of common sites all the time to give everyone the same chance
random.shuffle(commonSites)
possibleSite = None
for site in commonSites:
if site in thresholds:
# Count the number of jobs currently running of greater priority, if they
# are less than the site thresholds, then accept this element
curJobCount = sum([x[1] if x[0] >= prio else 0 for x in viewitems(siteJobCounts.get(site, {}))])
self.logger.debug("Job Count: %s, site: %s thresholds: %s",
curJobCount, site, thresholds[site])
if curJobCount < thresholds[site]:
possibleSite = site
break

if possibleSite:
self.logger.info("Accepting workflow: %s, with prio: %s, element id: %s, for site: %s",
element['RequestName'], prio, element.id, possibleSite)
numElems -= 1
elems.append(element)
siteJobCounts.setdefault(possibleSite, {})
siteJobCounts[possibleSite][prio] = siteJobCounts[possibleSite].setdefault(prio, 0) + \
element['Jobs'] * element.get('blowupFactor', 1.0)
else:
self.logger.debug("No available resources for %s with doc id %s",
element['RequestName'], element.id)

self.logger.info("And %d elements passed location and siteJobCounts restrictions for: %s",
len(elements), self.queueUrl)
return elements, siteJobCounts
len(elems), self.queueUrl)
return elems

def getActiveData(self):
"""Get data items we have work in the queue for"""
Expand Down

0 comments on commit 9e88ab0

Please sign in to comment.