diff --git a/kobo/worker/task.py b/kobo/worker/task.py index 891b08ff..8be52220 100644 --- a/kobo/worker/task.py +++ b/kobo/worker/task.py @@ -32,7 +32,8 @@ def __init__(self, hub, conf, task_id, args): self._task_info = self.hub.worker.get_task(self.task_id) self._task_manager = None # created by taskmanager (only for foreground tasks) self._args = args - self._subtask_list = [] + self._finished_subtasks = set() + self._running_subtask_list = [] self.result = "" @property @@ -71,7 +72,7 @@ def args(self): @property def subtask_list(self): # deepcopy to prevent modification - return copy.deepcopy(self._subtask_list) + return copy.deepcopy(self._running_subtask_list) def run(self): """Run the task.""" @@ -95,7 +96,7 @@ def spawn_subtask(self, method, args, label="", priority = None): raise RuntimeError("Foreground tasks can't spawn subtasks.") subtask_id = self.hub.worker.create_subtask(label, method, args, self.task_id, priority) - self._subtask_list.append(subtask_id) + self._running_subtask_list.append(subtask_id) return subtask_id def wait(self, subtasks=None): @@ -126,13 +127,19 @@ def wait(self, subtasks=None): signal.pause() # wake up on signal to check the status + # do not await subtasks that were already processed in previous wait calls + finished = [x for x in finished if x not in self._finished_subtasks] + # remove finished subtasks from the list, check results fail = False for i in finished: state = self.hub.worker.get_task(i) if state['state'] != TASK_STATES['CLOSED']: fail = True - self._subtask_list.remove(i) + self._running_subtask_list.remove(i) + + # mark finished subtasks as processed + self._finished_subtasks.update(finished) if fail: print("Failing because of at least one subtask hasn't closed properly.") diff --git a/tests/test_taskbase.py b/tests/test_taskbase.py index 8a04fde6..266780a5 100644 --- a/tests/test_taskbase.py +++ b/tests/test_taskbase.py @@ -93,15 +93,15 @@ def test_subtask_list(self): args = [] t = TaskBase(hub, conf, task_id, args) - self.assertEqual(len(t._subtask_list), 0) + self.assertEqual(len(t._running_subtask_list), 0) # access directly - t._subtask_list.append({'id': 1}) - self.assertEqual(len(t._subtask_list), 1) + t._running_subtask_list.append({'id': 1}) + self.assertEqual(len(t._running_subtask_list), 1) # copy to prevent modification t.subtask_list.append({'id': 2}) - self.assertEqual(len(t._subtask_list), 1) + self.assertEqual(len(t._running_subtask_list), 1) def test_run(self): task_info = {'id': 100}