Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is there a way to fail a test if an explicitly created background task fails after yielding from the fixture that created it? #196

Open
zoopp opened this issue Nov 3, 2020 · 8 comments

Comments

@zoopp
Copy link

zoopp commented Nov 3, 2020

I'll start with an example to provide some context:

import asyncio
import contextlib
import itertools

import pytest


class Consumer:
    def __init__(self, queue: asyncio.Queue):
        self._queue = queue

    async def run(self):
        for i in itertools.count(0):
            item = await self._queue.get()
            print(item)

            if i == 3:
                raise ValueError('oups')

            self._queue.task_done()


@pytest.fixture
async def queue() -> asyncio.Queue:
    return asyncio.Queue()


@pytest.fixture
async def consumer_service(queue):
    consumer = Consumer(queue)
    background_task = asyncio.create_task(consumer.run())

    yield

    background_task.cancel()
    with contextlib.suppress(asyncio.CancelledError):
        await run_task


@pytest.mark.asyncio
@pytest.mark.usefixtures('consumer_service')
async def test_consumer_consumes(queue: asyncio.Queue):
    queue.put_nowait('I')
    queue.put_nowait('Will')
    queue.put_nowait('Not')
    queue.put_nowait('Hang')

    await queue.join()

Here, the consumer service fixture will create a background task that runs the consumer's loop. When processing the last item in the queue, a value error is raised and the background task finishes with an exception before marking the last item as done. In this state, the test will hang and there will be no indication that something went astray.

Ideally, there would be some way to react to the background task failing by canceling and failing the test but I have not seen a way to do it. The closest I got to a solution was to attach a done callback to the background task and check for an exception there but I haven't come across an API that would allow me to recover from this or a way to signal the error (without it being captured by pytest).

@seifertm
Copy link
Contributor

seifertm commented Nov 4, 2020

I get it: queue.join blocks until all queued tasks are marked as done, but the last task is never marked done due to the ValueError.

But this behaviour is totally unrelated to pytest-asyncio, right? Are you proposing a new feature to detect those hangs?

@zoopp
Copy link
Author

zoopp commented Nov 4, 2020

You are right, the behavior itself is not directly related to pytest-asyncio itself.

What I want to point out however is that, at least to my knowledge, there is no graceful way to handle these situations at the moment while in the middle of running a test. The detection itself can be trivial to implement however, could I recover from this gracefully when the condition has been detected? For now I've resorted to the following hack, which while not pretty, it will print a stack trace and abort the process 😅:

import asyncio
import contextlib
import itertools
import traceback
from os import abort

import pytest


class Consumer:
    def __init__(self, queue: asyncio.Queue):
        self._queue = queue

    async def run(self):
        for i in itertools.count(0):
            item = await self._queue.get()
            print(item)

            if i == 3:
                raise ValueError('oups')

            self._queue.task_done()


@pytest.fixture
async def queue() -> asyncio.Queue:
    return asyncio.Queue()


@pytest.fixture
async def consumer(queue, capsys):
    consumer = Consumer(queue)
    run_task = asyncio.create_task(consumer.run())

    def panic_crash_clbk(task: asyncio.Task):
        # This will crash the whole process but it's better than hanging
        # indefinitely with no indication of what went wrong.
        exception = task.exception()
        if exception is not None:
            with capsys.disabled():
                traceback.print_exception(
                    None, exception, exception.__traceback__
                )
                abort()

    run_task.add_done_callback(panic_crash_clbk)

    yield

    run_task.cancel()
    with contextlib.suppress(asyncio.CancelledError):
        await run_task


@pytest.mark.asyncio
@pytest.mark.usefixtures('consumer')
async def test_consumer_consumes(queue: asyncio.Queue):
    queue.put_nowait('I')
    queue.put_nowait('Will')
    queue.put_nowait('Not')
    queue.put_nowait('Hang')

    await queue.join()

(Some other things that I looked at was calling pytest.exit and pytest.fail inside the callback but they both just raise an exception that appears to get lost.)

In general I think it would be pretty good to have a way to recover from these kind of situations gracefully (e.g. fail the test, teardown fixtures, carry on and report the error at the end).

Now that I think about it, if at all feasible, should this kind of functionality be provided by pytest itself instead?

@Tinche
Copy link
Member

Tinche commented Nov 4, 2020

This is a tough cookie. I don't think you should use a fixture for this. If it can fail a test, it should be part of the test itself.

Also I would put a timeout on the join and let that fail the test too.

@zoopp
Copy link
Author

zoopp commented Nov 8, 2020

Mhm, good point about the timeout on join, it would work in this case. Adding a per-test timeout would actually solve this issue in general, though arguably it can become finicky to figure out the right amount of time to wait for each individual test.

This makes me wonder though.. asyncio.wait_for raises an asyncio.TimeoutError in the context of the task that is running the test function. This reminds me that in the context of the completion callback attached to a task we do have access to all tasks. So, actually, the better approach would be to cancel all tasks instead of aborting the program:

def cancel_tasks_on_failure(task: asyncio.Task):

    def maybe_cancel_clbk(t: asyncio.Task):
        exception = t.exception()
        if exception is None:
            return

        for task in asyncio.all_tasks():
            task.cancel()

    task.add_done_callback(maybe_cancel_clbk)

Then I would just call cancel_tasks_on_failure for every background task that must not fail during test execution. This definitely is an improvement over the previous approach but I feel that I should really be canceling only the test and not all tasks. Actually, I think I figured it out:

@pytest.fixture
def task_watchdog(request):
    def cancel_test_on_exception(task: asyncio.Task):
        def maybe_cancel_clbk(t: asyncio.Task):
            exception = t.exception()
            if exception is None:
                return

            for task in asyncio.all_tasks():
                coro = task.get_coro()
                if coro.__qualname__ == request.function.__qualname__:
                    task.cancel()
                    return

        task.add_done_callback(maybe_cancel_clbk)

    return cancel_test_on_exception

Then I can just change the consumer fixture as follows:

@pytest.fixture
async def consumer(queue, task_watchdog):
    consumer = Consumer(queue)
    run_task = asyncio.create_task(consumer.run())

    task_watchdog(run_task)

    yield

    run_task.cancel()
    with contextlib.suppress(asyncio.CancelledError):
        await run_task

Apart from the fixture name (for which I'll have to figure out a better name), I'm actually pretty happy with how it works. Would this be something worth adding to pytest-asyncio? If not then I think we can close the issue.

@zoopp
Copy link
Author

zoopp commented Nov 8, 2020

Also, @Tinche, I got eager and overlooked this part of your comment:

I don't think you should use a fixture for this. If it can fail a test, it should be part of the test itself.

I agree but in my experience it tends to get ugly to handle it directly in the test. In my particular case, I have tests which depend on having a couple of control loops up and running. The alternative that I see right now would be to start the loops inside the tests and every time I await something I also wait on the loops (and check for exception when I regain control).

It works okay for 1-2 test but it quickly devolves in a DRY problem if you have a lot of tests with similar requirements.

@Tinche
Copy link
Member

Tinche commented Nov 9, 2020

I'm not the pytest police, you can do whatever works for you :) It's just my experience pytest fixtures aren't really suited to failing tests, and I don't mean only in an asyncio context.

@tmewett
Copy link

tmewett commented Dec 1, 2020

I'm running into a similar problem trying to test code which uses asyncio.start_server - exceptions in callbacks aren't actually brought up to the main .serve_forever coroutine, so failing tests will just hang forever. This probably shouldn't be fixed by this plugin as you say; I feel like there is something missing from the asyncio API, especially in this case

@diefans
Copy link

diefans commented Jan 28, 2021

@zoopp you should run the consumer and queue.join() in a background task (future) within the test, register a callback, which stops queue.join() future on failure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants