From ddd6ce1ded4d1b49cf92406b39d9a47321bd6776 Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Thu, 28 Nov 2024 14:20:18 +0100 Subject: [PATCH] Canary tw implementation (#8808) * tw_canary implementation based on 5% probability * add runCanary function and fix logic * adding comments and changing variable names * small fix * ensure float value * adding logging * final fix * novel_try * revert novel_try changes and fix arguments in runCanary config * adding polling to canary tw and requested cleanup * requested changes --- src/python/TaskWorker/MasterWorker.py | 65 +++++++++++++++++++++++---- 1 file changed, 56 insertions(+), 9 deletions(-) diff --git a/src/python/TaskWorker/MasterWorker.py b/src/python/TaskWorker/MasterWorker.py index 2d302b4163..20996f873d 100644 --- a/src/python/TaskWorker/MasterWorker.py +++ b/src/python/TaskWorker/MasterWorker.py @@ -376,8 +376,8 @@ def _lockWork(self, limit, getstatus, setstatus): * the server call succeeded or * the server could not find anything to update or * the server has an internal error""" - configreq = {'subresource': 'process', 'workername': self.config.TaskWorker.name, 'getstatus': getstatus, 'limit': limit, 'status': setstatus} + try: #self.server.post(self.restURInoAPI + '/workflowdb', data=urlencode(configreq)) self.crabserver.post(api='workflowdb', data=urlencode(configreq)) @@ -392,6 +392,40 @@ def _lockWork(self, limit, getstatus, setstatus): return True + def runCanary(self, limit): + # Decide whether to use canary_name based on the canary_fraction value. + # canary_fraction is a float value in the range [0.0-1.0] + use_canary = random.random() < float(self.config.TaskWorker.canary_fraction) + if not use_canary: + return True + + # Build the configreq dictionary + #This changes just the workername, status remains 'HOLDING' + #The arguments that are not changed can't be skipped + workername = self.config.TaskWorker.canary_name + + configreq = { + 'subresource': 'process', + 'workername': workername, + 'getstatus': 'HOLDING', + 'limit': limit, + 'status': 'HOLDING' + } + + try: + self.crabserver.post(api='workflowdb', data=urlencode(configreq)) + except HTTPException as hte: + msg = "HTTP Error while trying to change TW name in runCanary step: %s\n" % str(hte) + msg += "HTTP Headers are %s: " % hte.headers + self.logger.error(msg) + return False + except Exception: #pylint: disable=broad-except + self.logger.exception("Error trying to change TW name in runCanary step") + return False + + self.logger.info("TW changed from %s to %s during runCanary", self.config.TaskWorker.name, workername) + return True + def getWork(self, limit, getstatus, ignoreTWName=False): configreq = {'limit': limit, 'workername': self.config.TaskWorker.name, 'getstatus': getstatus} @@ -524,16 +558,27 @@ def algorithm(self): self.restartQueuedTasks() self.logger.debug("Master Worker Starting Main Cycle.") while not self.STOP: - selection_limit = self.config.TaskWorker.task_scheduling_limit - if not self._selectWork(limit=selection_limit): - self.logger.warning("No tasks selected.") - else: - self.logger.info("Work selected successfully.") + is_canary = self.config.TaskWorker.is_canary limit = self.slaves.queueableTasks() - if not self._lockWork(limit=limit, getstatus='NEW', setstatus='HOLDING'): - time.sleep(self.config.TaskWorker.polling) - continue + # _selectWork, _lockWork and runCanary steps are run only if TW is master (not canary) + if is_canary: + self.logger.info("This is canary TW %s running.", self.config.TaskWorker.name) + else: + canary_name = self.config.TaskWorker.canary_name + self.logger.info("This is master TW %s running.", self.config.TaskWorker.name) + selection_limit = self.config.TaskWorker.task_scheduling_limit + if not self._selectWork(limit=selection_limit): + self.logger.warning("No tasks selected.") + else: + self.logger.info("Work selected successfully.") + if not self._lockWork(limit=limit, getstatus='NEW', setstatus='HOLDING'): + time.sleep(self.config.TaskWorker.polling) + continue + if canary_name.startswith('crab'): + self.runCanary(limit=limit) + + # getWork is run by both master TW and canary TW pendingwork = self.getWork(limit=limit, getstatus='HOLDING') if pendingwork: @@ -541,6 +586,8 @@ def algorithm(self): tasksInfo = [{k:v for k, v in task.items() if k in keys} for task in pendingwork] self.logger.info("Retrieved a total of %d works", len(pendingwork)) self.logger.debug("Retrieved the following works: \n%s", str(tasksInfo)) + else: + time.sleep(self.config.TaskWorker.polling) toInject = [] for task in pendingwork: