Skip to content

Commit

Permalink
Fix integration tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Sep 26, 2023
1 parent 6b0446e commit a0fc1cc
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
4 changes: 2 additions & 2 deletions tests/integration/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,8 @@ async def test_restart_timeout(
schd = scheduler(id_)
async with start(schd):
for itask in schd.pool.get_tasks():
itask.state_reset(TASK_OUTPUT_SUCCEEDED)
schd.pool.spawn_on_output(itask, TASK_OUTPUT_SUCCEEDED)
schd.pool.task_events_mgr.process_message(
itask, 1, TASK_OUTPUT_SUCCEEDED)

# restart the completed workflow
schd = scheduler(id_)
Expand Down
22 changes: 12 additions & 10 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,9 +678,8 @@ async def test_restart_prereqs(
assert list_tasks(schd) == expected_1

# Mark 1/a as succeeded and spawn 1/z
schd.pool.get_tasks()[0].state_reset('succeeded')

schd.pool.spawn_on_output(schd.pool.get_tasks()[0], 'succeeded')
task_a = schd.pool.get_tasks()[0]
schd.pool.task_events_mgr.process_message(task_a, 1, 'succeeded')
assert list_tasks(schd) == expected_2

# Save our progress
Expand Down Expand Up @@ -802,8 +801,8 @@ async def test_reload_prereqs(
assert list_tasks(schd) == expected_1

# Mark 1/a as succeeded and spawn 1/z
schd.pool.get_tasks()[0].state_reset('succeeded')
schd.pool.spawn_on_output(schd.pool.get_tasks()[0], 'succeeded')
task_a = schd.pool.get_tasks()[0]
schd.pool.task_events_mgr.process_message(task_a, 1, 'succeeded')
assert list_tasks(schd) == expected_2

# Modify flow.cylc to add a new dependency on "z"
Expand Down Expand Up @@ -842,8 +841,7 @@ async def _test_restart_prereqs_sat():

# Mark both as succeeded and spawn 1/c
for itask in schd.pool.get_tasks():
itask.state_reset('succeeded')
schd.pool.spawn_on_output(itask, 'succeeded')
schd.pool.task_events_mgr.process_message(itask, 1, 'succeeded')
schd.workflow_db_mgr.put_update_task_outputs(itask)
schd.pool.remove_if_complete(itask)
schd.workflow_db_mgr.process_queued_ops()
Expand Down Expand Up @@ -1017,7 +1015,7 @@ async def test_db_update_on_removal(
task_a = schd.pool.get_tasks()[0]

# set the task to running
task_a.state_reset('running')
schd.pool.task_events_mgr.process_message(task_a, 1, 'started')

# update the db
await schd.update_data_structure()
Expand All @@ -1029,7 +1027,7 @@ async def test_db_update_on_removal(
]

# mark the task as succeeded and allow it to be removed from the pool
task_a.state_reset('succeeded')
schd.pool.task_events_mgr.process_message(task_a, 1, 'succeeded')
schd.pool.remove_if_complete(task_a)

# update the DB, note no new tasks have been added to the pool
Expand Down Expand Up @@ -1067,7 +1065,10 @@ async def test_no_flow_tasks_dont_spawn(
async with start(schd):
# mark task 1/a as succeeded
task_a = schd.pool.get_tasks()[0]
task_a.state_reset(TASK_OUTPUT_SUCCEEDED)
# TODO: actually for convenience state reset should set outputs?
# now state and outputs are divorced, unless use process_message.
task_a.state_reset('succeeded')
task_a.state.outputs.set_completion('succeeded', True)

for flow_nums, force, pool in (
# outputs yielded from a no-flow task should not spawn downstreams
Expand All @@ -1090,6 +1091,7 @@ async def test_no_flow_tasks_dont_spawn(
TASK_OUTPUT_SUCCEEDED,
forced=force,
)

schd.pool.spawn_on_all_outputs(task_a)

# ensure the pool is as expected
Expand Down

0 comments on commit a0fc1cc

Please sign in to comment.