diff --git a/tests/integration/test_scheduler.py b/tests/integration/test_scheduler.py index 2d570f14132..9fe52aa2995 100644 --- a/tests/integration/test_scheduler.py +++ b/tests/integration/test_scheduler.py @@ -351,8 +351,8 @@ async def test_restart_timeout( schd = scheduler(id_) async with start(schd): for itask in schd.pool.get_tasks(): - itask.state_reset(TASK_OUTPUT_SUCCEEDED) - schd.pool.spawn_on_output(itask, TASK_OUTPUT_SUCCEEDED) + schd.pool.task_events_mgr.process_message( + itask, 1, TASK_OUTPUT_SUCCEEDED) # restart the completed workflow schd = scheduler(id_) diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index c7fc3c976c5..3ce7df5d2e7 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -678,9 +678,8 @@ async def test_restart_prereqs( assert list_tasks(schd) == expected_1 # Mark 1/a as succeeded and spawn 1/z - schd.pool.get_tasks()[0].state_reset('succeeded') - - schd.pool.spawn_on_output(schd.pool.get_tasks()[0], 'succeeded') + task_a = schd.pool.get_tasks()[0] + schd.pool.task_events_mgr.process_message(task_a, 1, 'succeeded') assert list_tasks(schd) == expected_2 # Save our progress @@ -802,8 +801,8 @@ async def test_reload_prereqs( assert list_tasks(schd) == expected_1 # Mark 1/a as succeeded and spawn 1/z - schd.pool.get_tasks()[0].state_reset('succeeded') - schd.pool.spawn_on_output(schd.pool.get_tasks()[0], 'succeeded') + task_a = schd.pool.get_tasks()[0] + schd.pool.task_events_mgr.process_message(task_a, 1, 'succeeded') assert list_tasks(schd) == expected_2 # Modify flow.cylc to add a new dependency on "z" @@ -842,8 +841,7 @@ async def _test_restart_prereqs_sat(): # Mark both as succeeded and spawn 1/c for itask in schd.pool.get_tasks(): - itask.state_reset('succeeded') - schd.pool.spawn_on_output(itask, 'succeeded') + schd.pool.task_events_mgr.process_message(itask, 1, 'succeeded') schd.workflow_db_mgr.put_update_task_outputs(itask) schd.pool.remove_if_complete(itask) schd.workflow_db_mgr.process_queued_ops() @@ -1017,7 +1015,7 @@ async def test_db_update_on_removal( task_a = schd.pool.get_tasks()[0] # set the task to running - task_a.state_reset('running') + schd.pool.task_events_mgr.process_message(task_a, 1, 'started') # update the db await schd.update_data_structure() @@ -1029,7 +1027,7 @@ async def test_db_update_on_removal( ] # mark the task as succeeded and allow it to be removed from the pool - task_a.state_reset('succeeded') + schd.pool.task_events_mgr.process_message(task_a, 1, 'succeeded') schd.pool.remove_if_complete(task_a) # update the DB, note no new tasks have been added to the pool @@ -1067,7 +1065,10 @@ async def test_no_flow_tasks_dont_spawn( async with start(schd): # mark task 1/a as succeeded task_a = schd.pool.get_tasks()[0] - task_a.state_reset(TASK_OUTPUT_SUCCEEDED) + # TODO: actually for convenience state reset should set outputs? + # now state and outputs are divorced, unless use process_message. + task_a.state_reset('succeeded') + task_a.state.outputs.set_completion('succeeded', True) for flow_nums, force, pool in ( # outputs yielded from a no-flow task should not spawn downstreams @@ -1090,6 +1091,7 @@ async def test_no_flow_tasks_dont_spawn( TASK_OUTPUT_SUCCEEDED, forced=force, ) + schd.pool.spawn_on_all_outputs(task_a) # ensure the pool is as expected