Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
asvetlov committed Dec 16, 2024
1 parent 5fd9c50 commit 6ddc01d
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 13 deletions.
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ addopts =
# coverage reports
--cov=aiojobs/ --cov=tests/ --cov-report term
asyncio_mode = auto
asyncio_default_fixture_loop_scope = function
filterwarnings =
error
testpaths = tests/
Expand Down
38 changes: 25 additions & 13 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,36 +47,47 @@ async def coro() -> None:
job = await scheduler.spawn(coro())
assert not job.closed

assert scheduler._failed_task is not None
assert scheduler._failed_task.get_loop() is asyncio.get_running_loop()

assert len(scheduler) == 1
assert list(scheduler) == [job]
assert job in scheduler

await scheduler.close()


def test_spawn_with_different_loop() -> None:
async def coro() -> None:
async def func() -> None:
await asyncio.sleep(1)

scheduler = Scheduler()

async def spawn() -> Job[None]:
job = await scheduler.spawn(coro())
async def spawn1() -> Job[None]:
job = await scheduler.spawn(func())
assert not job.closed

assert len(scheduler) == 1
assert list(scheduler) == [job]
assert job in scheduler

return job

loop1 = asyncio.new_event_loop()
job = loop1.run_until_complete(spawn())
async def spawn2() -> None:
coro = func()
with pytest.raises(RuntimeError, match=" is bound to a different event loop"):
await scheduler.spawn(coro)

assert len(scheduler) == 1
assert list(scheduler) == [job]
assert job in scheduler
await coro # suppress a warning about non-awaited coroutine

Check notice

Code scanning / CodeQL

Statement has no effect Note test

This statement has no effect.

loop2 = asyncio.new_event_loop()
with pytest.raises(RuntimeError, match=" is bound to a different event loop"):
loop2.run_until_complete(spawn())
assert len(scheduler) == 1
assert list(scheduler) == [job]

assert len(scheduler) == 1
assert list(scheduler) == [job]
loop1 = asyncio.new_event_loop()
job = loop1.run_until_complete(spawn1())

loop2 = asyncio.new_event_loop()
loop2.run_until_complete(spawn2())

loop2.close()
loop1.run_until_complete(scheduler.close())
Expand Down Expand Up @@ -621,6 +632,7 @@ async def coro() -> None:
assert another_spawned and another_done # type: ignore[unreachable]


@pytest.mark.skipif(sys.version_info >= (3, 10), reason="Requires Python 3.10+")
def test_scheduler_must_be_created_within_running_loop() -> None:
with pytest.raises(RuntimeError) as exc_info:
Scheduler(close_timeout=0, limit=0, pending_limit=0, exception_handler=None)
Expand Down

0 comments on commit 6ddc01d

Please sign in to comment.