From fd6006c5a9a0387bca48540bd4d92c04b462bbb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Zaoral?= Date: Wed, 23 Aug 2023 09:26:45 +0200 Subject: [PATCH] worker: do not repeatedly remove already awaited tasks from subtask list Plain TaskBase.wait() always uses the **whole** list of all subtasks as the set of subtasks that should be awaited. Therefore, it will try to await even tasks that were already awaited in some previous TaskBase.wait() call and are no longer present in self._subtask_list which subsequently raise the ValueError shown below. Keep the set of already awaited tasks and skip their IDs when marking newly awaited subtasks as finished. Fixes the following traceback: ``` Traceback (most recent call last): File "/src/kobo/kobo/worker/taskmanager.py", line 423, in run_task task.run() File "/src/osh/worker/tasks/task_errata_diff_build.py", line 55, in run self.wait() File "/src/kobo/kobo/worker/task.py", line 147, in wait self._subtask_list.remove(i) ValueError: list.remove(x): x not in list ``` --- kobo/worker/task.py | 15 +++++++++++---- tests/test_taskbase.py | 8 ++++---- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/kobo/worker/task.py b/kobo/worker/task.py index 891b08ff..8be52220 100644 --- a/kobo/worker/task.py +++ b/kobo/worker/task.py @@ -32,7 +32,8 @@ def __init__(self, hub, conf, task_id, args): self._task_info = self.hub.worker.get_task(self.task_id) self._task_manager = None # created by taskmanager (only for foreground tasks) self._args = args - self._subtask_list = [] + self._finished_subtasks = set() + self._running_subtask_list = [] self.result = "" @property @@ -71,7 +72,7 @@ def args(self): @property def subtask_list(self): # deepcopy to prevent modification - return copy.deepcopy(self._subtask_list) + return copy.deepcopy(self._running_subtask_list) def run(self): """Run the task.""" @@ -95,7 +96,7 @@ def spawn_subtask(self, method, args, label="", priority = None): raise RuntimeError("Foreground tasks can't spawn subtasks.") subtask_id = self.hub.worker.create_subtask(label, method, args, self.task_id, priority) - self._subtask_list.append(subtask_id) + self._running_subtask_list.append(subtask_id) return subtask_id def wait(self, subtasks=None): @@ -126,13 +127,19 @@ def wait(self, subtasks=None): signal.pause() # wake up on signal to check the status + # do not await subtasks that were already processed in previous wait calls + finished = [x for x in finished if x not in self._finished_subtasks] + # remove finished subtasks from the list, check results fail = False for i in finished: state = self.hub.worker.get_task(i) if state['state'] != TASK_STATES['CLOSED']: fail = True - self._subtask_list.remove(i) + self._running_subtask_list.remove(i) + + # mark finished subtasks as processed + self._finished_subtasks.update(finished) if fail: print("Failing because of at least one subtask hasn't closed properly.") diff --git a/tests/test_taskbase.py b/tests/test_taskbase.py index 8a04fde6..266780a5 100644 --- a/tests/test_taskbase.py +++ b/tests/test_taskbase.py @@ -93,15 +93,15 @@ def test_subtask_list(self): args = [] t = TaskBase(hub, conf, task_id, args) - self.assertEqual(len(t._subtask_list), 0) + self.assertEqual(len(t._running_subtask_list), 0) # access directly - t._subtask_list.append({'id': 1}) - self.assertEqual(len(t._subtask_list), 1) + t._running_subtask_list.append({'id': 1}) + self.assertEqual(len(t._running_subtask_list), 1) # copy to prevent modification t.subtask_list.append({'id': 2}) - self.assertEqual(len(t._subtask_list), 1) + self.assertEqual(len(t._running_subtask_list), 1) def test_run(self): task_info = {'id': 100}