From 071612c9d5cea58cafe3beec473fa04f0e471a30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Zaoral?= Date: Wed, 18 Oct 2023 13:25:48 +0200 Subject: [PATCH 1/2] tests: extend the test_create_subtask test ... with worker and task state assertions. --- tests/test_xmlrpc_worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_xmlrpc_worker.py b/tests/test_xmlrpc_worker.py index 50bf1082..ef3b3299 100644 --- a/tests/test_xmlrpc_worker.py +++ b/tests/test_xmlrpc_worker.py @@ -897,6 +897,8 @@ def test_create_subtask(self): self.assertEqual(t_child.parent.id, t_parent.id) self.assertEqual(t_child.label, 'Label') self.assertEqual(t_child.method, 'Method') + self.assertEqual(t_child.worker, None) + self.assertEqual(t_child.state, TASK_STATES['FREE']) def test_create_subtask_if_another_worker_task(self): w = Worker.objects.create( From db85e6709b489d2f9fa4b0a5d7ed50c65be29d00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Zaoral?= Date: Thu, 19 Oct 2023 13:03:04 +0200 Subject: [PATCH 2/2] hub: support creation of subtasks with an inherited worker from parent task OpenScanHub expects the parent task and its subtasks to be executed on the same worker. However, there is a race with the `assign_task` XML-RPC call because it does not guarantee that the subtask won't be assigned to another worker by the hub in the meantime. This commit adds a support for creation of subtasks pre-assigned to the same worker executing the parent task, thus, invalidating the necessary condition for the race described above. Related: https://github.com/openscanhub/openscanhub/issues/156 --- kobo/hub/xmlrpc/worker.py | 4 +++- kobo/worker/task.py | 5 +++-- tests/test_taskbase.py | 3 ++- tests/test_xmlrpc_worker.py | 21 +++++++++++++++++++++ 4 files changed, 29 insertions(+), 4 deletions(-) diff --git a/kobo/hub/xmlrpc/worker.py b/kobo/hub/xmlrpc/worker.py index c008ff0c..6f817706 100644 --- a/kobo/hub/xmlrpc/worker.py +++ b/kobo/hub/xmlrpc/worker.py @@ -234,7 +234,8 @@ def get_awaited_tasks(request, awaited_task_list): @validate_worker -def create_subtask(request, label, method, args, parent_id, subtask_priority = None): +def create_subtask(request, label, method, args, parent_id, subtask_priority = None, + inherit_worker=False): parent_task = Task.objects.get_and_verify(task_id=parent_id, worker=request.worker) return Task.create_task( @@ -243,6 +244,7 @@ def create_subtask(request, label, method, args, parent_id, subtask_priority = N method, args=args, parent_id=parent_id, + worker_name=(request.worker.name if inherit_worker else None), arch_name=parent_task.arch.name, channel_name=parent_task.channel.name, priority=subtask_priority or parent_task.priority diff --git a/kobo/worker/task.py b/kobo/worker/task.py index 8be52220..cd060c83 100644 --- a/kobo/worker/task.py +++ b/kobo/worker/task.py @@ -90,12 +90,13 @@ def cleanup(cls, hub, conf, task_info): def notification(cls, hub, conf, task_info): pass - def spawn_subtask(self, method, args, label="", priority = None): + def spawn_subtask(self, method, args, label="", priority = None, + inherit_worker=False): """Spawn a new subtask.""" if self.foreground: raise RuntimeError("Foreground tasks can't spawn subtasks.") - subtask_id = self.hub.worker.create_subtask(label, method, args, self.task_id, priority) + subtask_id = self.hub.worker.create_subtask(label, method, args, self.task_id, priority, inherit_worker) self._running_subtask_list.append(subtask_id) return subtask_id diff --git a/tests/test_taskbase.py b/tests/test_taskbase.py index 266780a5..d2e06507 100644 --- a/tests/test_taskbase.py +++ b/tests/test_taskbase.py @@ -144,7 +144,8 @@ def test_spawn_subtask(self): ret_id = t.spawn_subtask('method', [], 'label') self.assertEqual(ret_id, subtask_id) - hub.worker.create_subtask.assert_called_once_with('label', 'method', [], task_id, None) + hub.worker.create_subtask.assert_called_once_with( + 'label', 'method', [], task_id, None, False) def test_spawn_subtask_foreground_task(self): task_info = {'id': 100} diff --git a/tests/test_xmlrpc_worker.py b/tests/test_xmlrpc_worker.py index ef3b3299..2f4ab806 100644 --- a/tests/test_xmlrpc_worker.py +++ b/tests/test_xmlrpc_worker.py @@ -900,6 +900,27 @@ def test_create_subtask(self): self.assertEqual(t_child.worker, None) self.assertEqual(t_child.state, TASK_STATES['FREE']) + def test_create_subtask_with_worker(self): + t_parent = Task.objects.create( + worker=self._worker, + arch=self._arch, + channel=self._channel, + owner=self._user, + state=TASK_STATES['FREE'], + ) + + req = _make_request(self._worker) + task_id = worker.create_subtask(req, 'Label', 'Method', None, t_parent.id, + inherit_worker=True) + self.assertTrue(task_id > 0) + + t_child = Task.objects.get(id=task_id) + self.assertEqual(t_child.parent.id, t_parent.id) + self.assertEqual(t_child.label, 'Label') + self.assertEqual(t_child.method, 'Method') + self.assertEqual(t_child.worker, self._worker) + self.assertEqual(t_child.state, TASK_STATES['ASSIGNED']) + def test_create_subtask_if_another_worker_task(self): w = Worker.objects.create( worker_key='other-worker',