Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: do not repeatedly remove already awaited tasks from subtask list #220

Merged
merged 1 commit into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions kobo/worker/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def __init__(self, hub, conf, task_id, args):
self._task_info = self.hub.worker.get_task(self.task_id)
self._task_manager = None # created by taskmanager (only for foreground tasks)
self._args = args
self._subtask_list = []
self._finished_subtasks = set()
self._running_subtask_list = []
self.result = ""

@property
Expand Down Expand Up @@ -71,7 +72,7 @@ def args(self):
@property
def subtask_list(self):
# deepcopy to prevent modification
return copy.deepcopy(self._subtask_list)
return copy.deepcopy(self._running_subtask_list)

def run(self):
"""Run the task."""
Expand All @@ -95,7 +96,7 @@ def spawn_subtask(self, method, args, label="", priority = None):
raise RuntimeError("Foreground tasks can't spawn subtasks.")

subtask_id = self.hub.worker.create_subtask(label, method, args, self.task_id, priority)
self._subtask_list.append(subtask_id)
self._running_subtask_list.append(subtask_id)
return subtask_id

def wait(self, subtasks=None):
Expand Down Expand Up @@ -126,13 +127,19 @@ def wait(self, subtasks=None):
signal.pause()
# wake up on signal to check the status

# do not await subtasks that were already processed in previous wait calls
finished = [x for x in finished if x not in self._finished_subtasks]

# remove finished subtasks from the list, check results
fail = False
for i in finished:
state = self.hub.worker.get_task(i)
if state['state'] != TASK_STATES['CLOSED']:
fail = True
self._subtask_list.remove(i)
self._running_subtask_list.remove(i)

# mark finished subtasks as processed
self._finished_subtasks.update(finished)

if fail:
print("Failing because of at least one subtask hasn't closed properly.")
Expand Down
8 changes: 4 additions & 4 deletions tests/test_taskbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ def test_subtask_list(self):
args = []

t = TaskBase(hub, conf, task_id, args)
self.assertEqual(len(t._subtask_list), 0)
self.assertEqual(len(t._running_subtask_list), 0)

# access directly
t._subtask_list.append({'id': 1})
self.assertEqual(len(t._subtask_list), 1)
t._running_subtask_list.append({'id': 1})
self.assertEqual(len(t._running_subtask_list), 1)

# copy to prevent modification
t.subtask_list.append({'id': 2})
self.assertEqual(len(t._subtask_list), 1)
self.assertEqual(len(t._running_subtask_list), 1)

def test_run(self):
task_info = {'id': 100}
Expand Down
Loading