From 9e88ab07bc91e43641332a78b8980ccc0b521d8d Mon Sep 17 00:00:00 2001 From: Alan Malta Rodrigues Date: Thu, 26 Jan 2023 13:24:46 -0500 Subject: [PATCH] Corrected GQ -> LQ data acquisition in slices use correct key names separate in different methods fix list integer comparison make parameters configurable; Todors changes use new thresholds when fetching data from parent queue --- etc/WMAgentConfig.py | 4 + src/python/WMCore/WorkQueue/WorkQueue.py | 15 +- .../WMCore/WorkQueue/WorkQueueBackend.py | 189 ++++++++++-------- 3 files changed, 123 insertions(+), 85 deletions(-) diff --git a/etc/WMAgentConfig.py b/etc/WMAgentConfig.py index 38d3e1a1ca..2c7f0464ed 100644 --- a/etc/WMAgentConfig.py +++ b/etc/WMAgentConfig.py @@ -141,6 +141,10 @@ config.WorkQueueManager.queueParams["QueueURL"] = "http://%s:5984" % (config.Agent.hostName) config.WorkQueueManager.queueParams["WorkPerCycle"] = 200 # don't pull more than this number of elements per cycle config.WorkQueueManager.queueParams["QueueDepth"] = 0.5 # pull work from GQ for only half of the resources +# number of available elements to be retrieved within a single CouchDB http request +config.WorkQueueManager.queueParams["RowsPerSlice"] = 2500 +# maximum number of available elements rows to be evaluated when acquiring GQ to LQ work +config.WorkQueueManager.queueParams["MaxRowsPerCycle"] = 50000 config.WorkQueueManager.queueParams["rucioAccount"] = "wmcore_transferor" # account for data locks diff --git a/src/python/WMCore/WorkQueue/WorkQueue.py b/src/python/WMCore/WorkQueue/WorkQueue.py index e4ddfe8455..4600f9af1c 100644 --- a/src/python/WMCore/WorkQueue/WorkQueue.py +++ b/src/python/WMCore/WorkQueue/WorkQueue.py @@ -119,6 +119,8 @@ def __init__(self, logger=None, dbi=None, **params): self.params.setdefault('QueueDepth', 1) # when less than this locally self.params.setdefault('WorkPerCycle', 100) + self.params.setdefault('RowsPerSlice', 2500) + self.params.setdefault('MaxRowsPerCycle', 50000) self.params.setdefault('LocationRefreshInterval', 600) self.params.setdefault('FullLocationRefreshInterval', 7200) self.params.setdefault('TrackLocationOrSubscription', 'location') @@ -310,13 +312,16 @@ def getWork(self, jobSlots, siteJobCounts, excludeWorkflows=None): """ excludeWorkflows = excludeWorkflows or [] results = [] - numElems = self.params['WorkPerCycle'] if not self.backend.isAvailable(): self.logger.warning('Backend busy or down: skipping fetching of work') return results + # TODO AMR: perhaps numElems limit should be removed for LQ -> WMBS acquisition matches, _ = self.backend.availableWork(jobSlots, siteJobCounts, - excludeWorkflows=excludeWorkflows, numElems=numElems) + excludeWorkflows=excludeWorkflows, + numElems=self.params['WorkPerCycle'], + rowsPerSlice=self.params['RowsPerSlice'], + maxRows=self.params['MaxRowsPerCycle']) self.logger.info('Got %i elements matching the constraints', len(matches)) if not matches: @@ -825,9 +830,11 @@ def freeResouceCheck(self): return (resources, jobCounts) def getAvailableWorkfromParent(self, resources, jobCounts, printFlag=False): - numElems = self.params['WorkPerCycle'] self.logger.info("Going to fetch work from the parent queue: %s", self.parent_queue.queueUrl) - work, _ = self.parent_queue.availableWork(resources, jobCounts, self.params['Team'], numElems=numElems) + work, _ = self.parent_queue.availableWork(resources, jobCounts, self.params['Team'], + numElems=self.params['WorkPerCycle'], + rowsPerSlice=self.params['RowsPerSlice'], + maxRows=self.params['MaxRowsPerCycle']) if not work: self._printLog('No available work in parent queue.', printFlag, "warning") return work diff --git a/src/python/WMCore/WorkQueue/WorkQueueBackend.py b/src/python/WMCore/WorkQueue/WorkQueueBackend.py index a23c00a180..d3d069f065 100644 --- a/src/python/WMCore/WorkQueue/WorkQueueBackend.py +++ b/src/python/WMCore/WorkQueue/WorkQueueBackend.py @@ -6,6 +6,7 @@ """ from builtins import object +from math import ceil from future.utils import viewitems @@ -411,8 +412,8 @@ def calculateAvailableWork(self, thresholds, siteJobCounts): len(elements), self.queueUrl) return elements, siteJobCounts - def availableWork(self, thresholds, siteJobCounts, team=None, - excludeWorkflows=None, numElems=9999999): + def availableWork(self, thresholds, siteJobCounts, team=None, excludeWorkflows=None, + numElems=1000, rowsPerSlice=1000, maxRows=1000): """ Get work - either from local or global queue - which is available to be run. @@ -422,117 +423,143 @@ def availableWork(self, thresholds, siteJobCounts, team=None, is a dictionary with the number of jobs running at a given priority. :param team: a string with the team name we want to pull work for :param excludeWorkflows: list of (aborted) workflows that should not be accepted - :param numElems: integer with the maximum number of elements to be accepted (default - to a very large number when pulling work from local queue, read unlimited) + :param numElems: integer with the maximum number of elements to be accepted. + :param rowsPerSlice: integer defining the amount of rows for each slice (slices + of a couchdb view request). + :param maxRows: maximum number of available elements (rows) to be considered + when pulling work down to the agent. :return: a tuple with the elements accepted and an overview of job counts per site """ excludeWorkflows = excludeWorkflows or [] - elements = [] + acceptedElems = [] # If there are no sites, punt early. if not thresholds: self.logger.error("No thresholds is set: Please check") - return elements, siteJobCounts + return acceptedElems, siteJobCounts self.logger.info("Current siteJobCounts:") for site, jobsByPrio in viewitems(siteJobCounts): self.logger.info(" %s : %s", site, jobsByPrio) + # Find out how many elements are in Available status + numAvail = self.queueLength() + self.logger.info("Current amount of WQEs in Available status: %s", numAvail) + self.logger.info("Getting up to %d available work from %s", numElems, self.queueUrl) self.logger.info(" for team name: %s", team) - self.logger.info(" with excludeWorkflows: %s", excludeWorkflows) + self.logger.info(" with excludeWorkflows count: %s", len(excludeWorkflows)) + self.logger.debug(" with excludeWorkflows: %s", excludeWorkflows) self.logger.info(" for thresholds: %s", thresholds) - # FIXME: magic numbers - docsSliceSize = 1000 options = {} options['include_docs'] = True options['descending'] = True options['resources'] = thresholds - options['limit'] = docsSliceSize + options['limit'] = rowsPerSlice # FIXME: num_elem option can likely be deprecated, but it needs synchronization # between agents and global workqueue... for now, make sure it can return the slice size - options['num_elem'] = docsSliceSize + options['num_elem'] = rowsPerSlice if team: options['team'] = team # Fetch workqueue elements in slices, using the CouchDB "limit" and "skip" # options for couch views. Conditions to stop this loop are: - # a) have a hard stop at 50k+1 (we might have to make this configurable) - # b) stop as soon as an empty slice is returned by Couch (thus all docs have - # already been retrieve) - # c) or, once "numElems" elements have been accepted - numSkip = 0 - breakOut = False - while True: - if breakOut: - # then we have reached the maximum number of elements to be accepted - break - self.logger.info(" with limit docs: %s, and skip first %s docs", docsSliceSize, numSkip) - options['skip'] = numSkip + # a) stop once total_rows is reached (exhausted all available elements) + # b) hit maximum allowed elements/rows to be considered for data acquisition (maxRows) + # c) or, once the targeted number of elements has been accepted (numElems) + numSlices = ceil(numAvail / rowsPerSlice) + numSlices = min(numSlices, int(maxRows / rowsPerSlice)) + for sliceNum in range(numSlices): + # documents to skip as a function of the slice number + options['skip'] = sliceNum * rowsPerSlice + self.logger.info(" for slice: %s, with rows range [%s - %s]", + sliceNum, options['skip'], options['skip'] + options['limit']) result = self.db.loadList('WorkQueue', 'workRestrictions', 'availableByPriority', options) - result = json.loads(result) - if result: - self.logger.info("Retrieved %d elements from workRestrictions list for: %s", - len(result), self.queueUrl) + # now check the remaining restrictions and priority + wqeSlots = numElems - len(acceptedElems) + elems = self._evalAvailableWork(json.loads(result), thresholds, siteJobCounts, + excludeWorkflows, wqeSlots) + acceptedElems.extend(elems) + if len(acceptedElems) >= numElems: + msg = f"Reached maximum number of elements to be accepted, " + msg += f"configured to: {numElems}, from queue: {self.queueUrl}" + self.logger.info(msg) + break + + self.logger.info("Total of %d elements passed location and siteJobCounts restrictions for: %s", + len(acceptedElems), self.queueUrl) + return acceptedElems, siteJobCounts + + def _evalAvailableWork(self, listElems, thresholds, siteJobCounts, + excludeWorkflows, numElems): + """ + Evaluate work available in workqueue and decide whether it can be + accepted or not. + + :param listElems: list of dictionaries that correspond to the workqueue elements. + :param thresholds: a dictionary key'ed by the site name, values representing the + maximum number of jobs allowed at that site. + :param siteJobCounts: a dictionary-of-dictionaries key'ed by the site name; value + is a dictionary with the number of jobs running at a given priority. + NOTE that it is updated in place. + :param excludeWorkflows: list of (aborted) workflows that should not be accepted + :param numElems: integer with the maximum number of elements to be accepted (default + to a very large number when pulling work from local queue, read unlimited) + :return: a tuple with the elements accepted and an overview of job counts per site + """ + elems = [] + self.logger.info("Retrieved %d elements from workRestrictions list for: %s", + len(listElems), self.queueUrl) + # Convert python dictionary into Couch WQE objects, skipping aborted workflows + # And sort them by creation time and priority, such that highest priority and + # oldest elements come first in the list + sortedElements = [] + for i in listElems: + element = CouchWorkQueueElement.fromDocument(self.db, i) + # make sure not to acquire work for aborted or force-completed workflows + if element['RequestName'] in excludeWorkflows: + msg = "Skipping aborted/force-completed workflow: %s, work id: %s" + self.logger.info(msg, element['RequestName'], element._id) else: - self.logger.info("All the workqueue elements have been exhausted for: %s ", self.queueUrl) + sortedElements.append(element) + sortAvailableElements(sortedElements) + + for element in sortedElements: + if numElems <= 0: + # it means we accepted the configured number of elements break - # update number of documents to skip in the next cycle - numSkip += docsSliceSize - - # Convert python dictionary into Couch WQE objects, skipping aborted workflows - # And sort them by creation time and priority, such that highest priority and - # oldest elements come first in the list - sortedElements = [] - for i in result: - element = CouchWorkQueueElement.fromDocument(self.db, i) - # make sure not to acquire work for aborted or force-completed workflows - if element['RequestName'] in excludeWorkflows: - msg = "Skipping aborted/force-completed workflow: %s, work id: %s" - self.logger.info(msg, element['RequestName'], element._id) - else: - sortedElements.append(element) - sortAvailableElements(sortedElements) - - for element in sortedElements: - if numElems <= 0: - msg = "Reached maximum number of elements to be accepted, " - msg += "configured to: {}, from queue: {}".format(len(elements), self.queueUrl) - self.logger.info(msg) - breakOut = True # get out of the outer loop as well - break - commonSites = possibleSites(element) - prio = element['Priority'] - # shuffle list of common sites all the time to give everyone the same chance - random.shuffle(commonSites) - possibleSite = None - for site in commonSites: - if site in thresholds: - # Count the number of jobs currently running of greater priority, if they - # are less than the site thresholds, then accept this element - curJobCount = sum([x[1] if x[0] >= prio else 0 for x in viewitems(siteJobCounts.get(site, {}))]) - self.logger.debug( - "Job Count: %s, site: %s thresholds: %s" % (curJobCount, site, thresholds[site])) - if curJobCount < thresholds[site]: - possibleSite = site - break - - if possibleSite: - self.logger.info("Accepting workflow: %s, with prio: %s, element id: %s, for site: %s", - element['RequestName'], prio, element.id, possibleSite) - numElems -= 1 - elements.append(element) - siteJobCounts.setdefault(possibleSite, {}) - siteJobCounts[possibleSite][prio] = siteJobCounts[possibleSite].setdefault(prio, 0) + \ - element['Jobs'] * element.get('blowupFactor', 1.0) - else: - self.logger.debug("No available resources for %s with doc id %s", - element['RequestName'], element.id) + commonSites = possibleSites(element) + prio = element['Priority'] + # shuffle list of common sites all the time to give everyone the same chance + random.shuffle(commonSites) + possibleSite = None + for site in commonSites: + if site in thresholds: + # Count the number of jobs currently running of greater priority, if they + # are less than the site thresholds, then accept this element + curJobCount = sum([x[1] if x[0] >= prio else 0 for x in viewitems(siteJobCounts.get(site, {}))]) + self.logger.debug("Job Count: %s, site: %s thresholds: %s", + curJobCount, site, thresholds[site]) + if curJobCount < thresholds[site]: + possibleSite = site + break + + if possibleSite: + self.logger.info("Accepting workflow: %s, with prio: %s, element id: %s, for site: %s", + element['RequestName'], prio, element.id, possibleSite) + numElems -= 1 + elems.append(element) + siteJobCounts.setdefault(possibleSite, {}) + siteJobCounts[possibleSite][prio] = siteJobCounts[possibleSite].setdefault(prio, 0) + \ + element['Jobs'] * element.get('blowupFactor', 1.0) + else: + self.logger.debug("No available resources for %s with doc id %s", + element['RequestName'], element.id) self.logger.info("And %d elements passed location and siteJobCounts restrictions for: %s", - len(elements), self.queueUrl) - return elements, siteJobCounts + len(elems), self.queueUrl) + return elems def getActiveData(self): """Get data items we have work in the queue for"""