Skip to content

Commit

Permalink
worker: do not repeatedly remove already awaited tasks from subtask list
Browse files Browse the repository at this point in the history
Plain TaskBase.wait() always uses the **whole** list of all subtasks as
the set of subtasks that should be awaited.  Therefore, it will try to
await even tasks that were already awaited in some previous
TaskBase.wait() call and are no longer present in self._subtask_list
which subsequently raise the ValueError shown below.

Keep the set of already awaited tasks and skip their IDs when marking
newly awaited subtasks as finished.

Fixes the following traceback:
```
Traceback (most recent call last):
  File "/src/kobo/kobo/worker/taskmanager.py", line 423, in run_task
    task.run()
  File "/src/osh/worker/tasks/task_errata_diff_build.py", line 55, in run
    self.wait()
  File "/src/kobo/kobo/worker/task.py", line 147, in wait
    self._subtask_list.remove(i)
ValueError: list.remove(x): x not in list
```
  • Loading branch information
lzaoral committed Aug 23, 2023
1 parent ce4fcff commit fd6006c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
15 changes: 11 additions & 4 deletions kobo/worker/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def __init__(self, hub, conf, task_id, args):
self._task_info = self.hub.worker.get_task(self.task_id)
self._task_manager = None # created by taskmanager (only for foreground tasks)
self._args = args
self._subtask_list = []
self._finished_subtasks = set()
self._running_subtask_list = []
self.result = ""

@property
Expand Down Expand Up @@ -71,7 +72,7 @@ def args(self):
@property
def subtask_list(self):
# deepcopy to prevent modification
return copy.deepcopy(self._subtask_list)
return copy.deepcopy(self._running_subtask_list)

def run(self):
"""Run the task."""
Expand All @@ -95,7 +96,7 @@ def spawn_subtask(self, method, args, label="", priority = None):
raise RuntimeError("Foreground tasks can't spawn subtasks.")

subtask_id = self.hub.worker.create_subtask(label, method, args, self.task_id, priority)
self._subtask_list.append(subtask_id)
self._running_subtask_list.append(subtask_id)
return subtask_id

def wait(self, subtasks=None):
Expand Down Expand Up @@ -126,13 +127,19 @@ def wait(self, subtasks=None):
signal.pause()
# wake up on signal to check the status

# do not await subtasks that were already processed in previous wait calls
finished = [x for x in finished if x not in self._finished_subtasks]

# remove finished subtasks from the list, check results
fail = False
for i in finished:
state = self.hub.worker.get_task(i)
if state['state'] != TASK_STATES['CLOSED']:
fail = True
self._subtask_list.remove(i)
self._running_subtask_list.remove(i)

# mark finished subtasks as processed
self._finished_subtasks.update(finished)

if fail:
print("Failing because of at least one subtask hasn't closed properly.")
Expand Down
8 changes: 4 additions & 4 deletions tests/test_taskbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ def test_subtask_list(self):
args = []

t = TaskBase(hub, conf, task_id, args)
self.assertEqual(len(t._subtask_list), 0)
self.assertEqual(len(t._running_subtask_list), 0)

# access directly
t._subtask_list.append({'id': 1})
self.assertEqual(len(t._subtask_list), 1)
t._running_subtask_list.append({'id': 1})
self.assertEqual(len(t._running_subtask_list), 1)

# copy to prevent modification
t.subtask_list.append({'id': 2})
self.assertEqual(len(t._subtask_list), 1)
self.assertEqual(len(t._running_subtask_list), 1)

def test_run(self):
task_info = {'id': 100}
Expand Down

0 comments on commit fd6006c

Please sign in to comment.