Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsub): fix unhandled exception in subscriber task #815

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

w-miller
Copy link

@w-miller w-miller commented Sep 19, 2024

Summary

We have seen unhandled exceptions making their way to the root unhandled exception handler of the event loop in our application that uses pubsub subscribe(), as in the below Traceback. Whilst our application catches the relevant aiohttp errors when awaiting the subscribe() call, this exception remained unhandled. This is because subscribe was ignoring the tasks returned from the calls to asyncio.wait(), meaning any exception results on these tasks would not be handled and the tasks would get deleted when subscribe() returned.

Fix this by gathering the results of all worker tasks before returning from subscribe(). Given that we're about to raise
asyncio.CancelledError() and return, there's no point in raising the internal errors here, so just log them.

Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/gcloud/aio/pubsub/subscriber.py", line 403, in producer
    new_messages = await asyncio.shield(pull_task)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/gcloud/aio/pubsub/subscriber.py", line 426, in producer
    new_messages += await asyncio.wait_for(pull_task, 5)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/asyncio/tasks.py", line 479, in wait_for
    return fut.result()
           ^^^^^^^^^^^^
  File "/usr/lib/python3/dist-packages/gcloud/aio/pubsub/subscriber_client.py", line 157, in pull
    resp = await s.post(
           ^^^^^^^^^^^^^
  File "/usr/lib/python3/dist-packages/gcloud/aio/auth/session.py", line 190, in post
    resp = await self.session.post(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3/dist-packages/aiohttp/client.py", line 560, in _request
    await resp.start(conn)
  File "/usr/lib/python3/dist-packages/aiohttp/client_reqrep.py", line 899, in start
    message, payload = await protocol.read()  # type: ignore[union-attr]
                       ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3/dist-packages/aiohttp/streams.py", line 616, in read
    await self._waiter
aiohttp.client_exceptions.ServerDisconnectedError: Server disconnected

We have seen unhandled exceptions making their way to the root unhandled
exception handler of the event loop in our application that uses pubsub
subscribe(), as in the below Traceback. Whilst our application catches
the relevant aiohttp errors when awaiting the subscribe() call, this
exception remained unhandled. This is because subscribe was ignoring the
tasks returned from the calls to asyncio.wait(), meaning any exception
results on these tasks would not be handled and the tasks would get
deleted when subscribe() returned.

Fix this by gathering the results of all worker tasks before returning
from subscribe(). Given that we're about to raise
asyncio.CancelledError() and return, there's no point in raising the
internal errors here, so just log them.

```
Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/gcloud/aio/pubsub/subscriber.py", line 403, in producer
    new_messages = await asyncio.shield(pull_task)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/gcloud/aio/pubsub/subscriber.py", line 426, in producer
    new_messages += await asyncio.wait_for(pull_task, 5)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/asyncio/tasks.py", line 479, in wait_for
    return fut.result()
           ^^^^^^^^^^^^
  File "/usr/lib/python3/dist-packages/gcloud/aio/pubsub/subscriber_client.py", line 157, in pull
    resp = await s.post(
           ^^^^^^^^^^^^^
  File "/usr/lib/python3/dist-packages/gcloud/aio/auth/session.py", line 190, in post
    resp = await self.session.post(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3/dist-packages/aiohttp/client.py", line 560, in _request
    await resp.start(conn)
  File "/usr/lib/python3/dist-packages/aiohttp/client_reqrep.py", line 899, in start
    message, payload = await protocol.read()  # type: ignore[union-attr]
                       ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3/dist-packages/aiohttp/streams.py", line 616, in read
    await self._waiter
aiohttp.client_exceptions.ServerDisconnectedError: Server disconnected
```
@w-miller w-miller marked this pull request as ready for review September 19, 2024 14:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

1 participant