Skip to content

Commit

Permalink
Canary tw implementation (dmwm#8808)
Browse files Browse the repository at this point in the history
* tw_canary implementation based on 5% probability

* add runCanary function and fix logic

* adding comments and changing variable names

* small fix

* ensure float value

* adding logging

* final fix

* novel_try

* revert novel_try changes and fix arguments in runCanary config

* adding polling to canary tw and requested cleanup

* requested changes
  • Loading branch information
aspiringmind-code authored Nov 28, 2024
1 parent 8bca36f commit ddd6ce1
Showing 1 changed file with 56 additions and 9 deletions.
65 changes: 56 additions & 9 deletions src/python/TaskWorker/MasterWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,8 @@ def _lockWork(self, limit, getstatus, setstatus):
* the server call succeeded or
* the server could not find anything to update or
* the server has an internal error"""

configreq = {'subresource': 'process', 'workername': self.config.TaskWorker.name, 'getstatus': getstatus, 'limit': limit, 'status': setstatus}

try:
#self.server.post(self.restURInoAPI + '/workflowdb', data=urlencode(configreq))
self.crabserver.post(api='workflowdb', data=urlencode(configreq))
Expand All @@ -392,6 +392,40 @@ def _lockWork(self, limit, getstatus, setstatus):

return True

def runCanary(self, limit):
# Decide whether to use canary_name based on the canary_fraction value.
# canary_fraction is a float value in the range [0.0-1.0]
use_canary = random.random() < float(self.config.TaskWorker.canary_fraction)
if not use_canary:
return True

# Build the configreq dictionary
#This changes just the workername, status remains 'HOLDING'
#The arguments that are not changed can't be skipped
workername = self.config.TaskWorker.canary_name

configreq = {
'subresource': 'process',
'workername': workername,
'getstatus': 'HOLDING',
'limit': limit,
'status': 'HOLDING'
}

try:
self.crabserver.post(api='workflowdb', data=urlencode(configreq))
except HTTPException as hte:
msg = "HTTP Error while trying to change TW name in runCanary step: %s\n" % str(hte)
msg += "HTTP Headers are %s: " % hte.headers
self.logger.error(msg)
return False
except Exception: #pylint: disable=broad-except
self.logger.exception("Error trying to change TW name in runCanary step")
return False

self.logger.info("TW changed from %s to %s during runCanary", self.config.TaskWorker.name, workername)
return True


def getWork(self, limit, getstatus, ignoreTWName=False):
configreq = {'limit': limit, 'workername': self.config.TaskWorker.name, 'getstatus': getstatus}
Expand Down Expand Up @@ -524,23 +558,36 @@ def algorithm(self):
self.restartQueuedTasks()
self.logger.debug("Master Worker Starting Main Cycle.")
while not self.STOP:
selection_limit = self.config.TaskWorker.task_scheduling_limit
if not self._selectWork(limit=selection_limit):
self.logger.warning("No tasks selected.")
else:
self.logger.info("Work selected successfully.")
is_canary = self.config.TaskWorker.is_canary
limit = self.slaves.queueableTasks()
if not self._lockWork(limit=limit, getstatus='NEW', setstatus='HOLDING'):
time.sleep(self.config.TaskWorker.polling)
continue

# _selectWork, _lockWork and runCanary steps are run only if TW is master (not canary)
if is_canary:
self.logger.info("This is canary TW %s running.", self.config.TaskWorker.name)
else:
canary_name = self.config.TaskWorker.canary_name
self.logger.info("This is master TW %s running.", self.config.TaskWorker.name)
selection_limit = self.config.TaskWorker.task_scheduling_limit
if not self._selectWork(limit=selection_limit):
self.logger.warning("No tasks selected.")
else:
self.logger.info("Work selected successfully.")
if not self._lockWork(limit=limit, getstatus='NEW', setstatus='HOLDING'):
time.sleep(self.config.TaskWorker.polling)
continue
if canary_name.startswith('crab'):
self.runCanary(limit=limit)

# getWork is run by both master TW and canary TW
pendingwork = self.getWork(limit=limit, getstatus='HOLDING')

if pendingwork:
keys = ['tm_task_command', 'tm_taskname']
tasksInfo = [{k:v for k, v in task.items() if k in keys} for task in pendingwork]
self.logger.info("Retrieved a total of %d works", len(pendingwork))
self.logger.debug("Retrieved the following works: \n%s", str(tasksInfo))
else:
time.sleep(self.config.TaskWorker.polling)

toInject = []
for task in pendingwork:
Expand Down

0 comments on commit ddd6ce1

Please sign in to comment.