Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RuntimeError: Set changed size during iteration (in aio-pika 9.3.0) #589

Open
dvarrazzo opened this issue Oct 16, 2023 · 2 comments
Open

Comments

@dvarrazzo
Copy link
Contributor

Hello,

I received Sentry issues from a live server, which, if I understand correctly, happened in a moment in which a misconfigured firewall closed the door in front of an aio-pika client, and the k8s health check was trying to connect.

The error is RuntimeError: Set changed size during iteration on this line:

for channel in self.__channels:

Unfortunately I can't work more on it in order to provide anything reproducible. However maybe you want to make a copy of this set before iterating it?

--- aio_pika/robust_connection.py.bak	2023-10-16 20:55:57.867441312 +0200
+++ aio_pika/robust_connection.py    	2023-10-16 20:56:12.163364473 +0200
@@ -95,7 +95,7 @@
             raise RuntimeError("No active transport for connection %r", self)
 
         try:
-            for channel in self.__channels:
+            for channel in list(self.__channels):
                 try:
                     await channel.restore()
                 except Exception:

aio-pika version: 9.3.0, Python version: 3.10.12. I found #388 but it claims to having been fixed in >=8, so this might be a different instance.

Full stack trace
CancelledError: null
  File "apps_mq/mq.py", line 265, in on_message
    await message.ack()
  File "aio_pika/message.py", line 476, in ack
    await self.channel.basic_ack(
  File "aiormq/channel.py", line 556, in basic_ack
    await drain_future

CancelledError: null
  File "aio_pika/queue.py", line 37, in consumer
    return await create_task(callback, message)

CancelledError: null
  File "aiormq/abc.py", line 44, in __inner
    return await self.task

RuntimeError: Set changed size during iteration
  File "apps_mq/mq.py", line 279, in check_health
    await asyncio.gather(test_sender(), test_receiver())
  File "asyncio/tasks.py", line 650, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "aiormq/abc.py", line 46, in __inner
    raise self._exception from e
  File "asyncio/tasks.py", line 650, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "aiormq/abc.py", line 46, in __inner
    raise self._exception from e
  File "apps_mq/mq.py", line 268, in test_receiver
    await queue.consume(on_message)
  File "aio_pika/robust_queue.py", line 120, in consume
    consumer_tag = await super().consume(
  File "aio_pika/queue.py", line 223, in consume
    consume_result = await channel.basic_consume(
  File "aiormq/channel.py", line 526, in basic_consume
    return await self.rpc(
  File "aiormq/base.py", line 164, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "aiormq/abc.py", line 46, in __inner
    raise self._exception from e
  File "apps_mq/mq.py", line 268, in test_receiver
    await queue.consume(on_message)
  File "aio_pika/robust_queue.py", line 120, in consume
    consumer_tag = await super().consume(
  File "aio_pika/queue.py", line 223, in consume
    consume_result = await channel.basic_consume(
  File "aiormq/channel.py", line 526, in basic_consume
    return await self.rpc(
  File "aiormq/base.py", line 164, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "aiormq/abc.py", line 46, in __inner
    raise self._exception from e
  File "apps_mq/mq.py", line 268, in test_receiver
    await queue.consume(on_message)
  File "aio_pika/robust_queue.py", line 120, in consume
    consumer_tag = await super().consume(
  File "aio_pika/queue.py", line 223, in consume
    consume_result = await channel.basic_consume(
  File "aiormq/channel.py", line 526, in basic_consume
    return await self.rpc(
  File "aiormq/base.py", line 164, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "aiormq/abc.py", line 46, in __inner
    raise self._exception from e
  File "apps_mq/mq.py", line 268, in test_receiver
    await queue.consume(on_message)
  File "aio_pika/robust_queue.py", line 120, in consume
    consumer_tag = await super().consume(
  File "aio_pika/queue.py", line 223, in consume
    consume_result = await channel.basic_consume(
  File "aiormq/channel.py", line 526, in basic_consume
    return await self.rpc(
  File "aiormq/base.py", line 164, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "aiormq/abc.py", line 46, in __inner
    raise self._exception from e
  File "apps_mq/mq.py", line 268, in test_receiver
    await queue.consume(on_message)
  File "aio_pika/robust_queue.py", line 120, in consume
    consumer_tag = await super().consume(
  File "aio_pika/queue.py", line 223, in consume
    consume_result = await channel.basic_consume(
  File "aiormq/channel.py", line 526, in basic_consume
    return await self.rpc(
  File "aiormq/base.py", line 164, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "aiormq/abc.py", line 46, in __inner
    raise self._exception from e
  File "aio_pika/channel.py", line 172, in _open
    await self._on_open()
  File "aio_pika/robust_channel.py", line 126, in _on_open
    await channel.basic_qos(
  File "aiormq/channel.py", line 709, in basic_qos
    return await self.rpc(
  File "aiormq/base.py", line 164, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "aiormq/abc.py", line 46, in __inner
    raise self._exception from e
  File "***/routers/tools.py", line 70, in rabbitmq_check
    await mq.mq.check_health()
  File "apps_mq/mq.py", line 281, in check_health
    await queue.delete(if_unused=False, if_empty=False)
  File "aio_pika/queue.py", line 346, in delete
    return await channel.queue_delete(
  File "aiormq/channel.py", line 879, in queue_delete
    return await self.rpc(
  File "aiormq/base.py", line 164, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "aiormq/abc.py", line 46, in __inner
    raise self._exception from e
  File "apps_mq/mq.py", line 268, in test_receiver
    await queue.consume(on_message)
  File "aio_pika/robust_queue.py", line 120, in consume
    consumer_tag = await super().consume(
  File "aio_pika/queue.py", line 223, in consume
    consume_result = await channel.basic_consume(
  File "aiormq/channel.py", line 526, in basic_consume
    return await self.rpc(
  File "aiormq/base.py", line 164, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "aiormq/abc.py", line 46, in __inner
    raise self._exception from e
  File "asyncio/tasks.py", line 650, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "aiormq/abc.py", line 46, in __inner
    raise self._exception from e
  File "asyncio/tasks.py", line 650, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "aiormq/abc.py", line 46, in __inner
    raise self._exception from e
  File "asyncio/tasks.py", line 650, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "aiormq/abc.py", line 46, in __inner
    raise self._exception from e
  File "asyncio/tasks.py", line 650, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "aiormq/abc.py", line 46, in __inner
    raise self._exception from e
  File "asyncio/tasks.py", line 650, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "aiormq/abc.py", line 46, in __inner
    raise self._exception from e
  File "asyncio/tasks.py", line 650, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "aiormq/abc.py", line 46, in __inner
    raise self._exception from e
  File "asyncio/tasks.py", line 650, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "aiormq/abc.py", line 46, in __inner
    raise self._exception from e
  File "asyncio/tasks.py", line 650, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "aiormq/abc.py", line 46, in __inner
    raise self._exception from e
  File "asyncio/tasks.py", line 650, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "aiormq/abc.py", line 46, in __inner
    raise self._exception from e
  File "aio_pika/robust_connection.py", line 98, in _on_connected
    for channel in self.__channels:
  File "_weakrefset.py", line 65, in __iter__
    for itemref in self.data:
@mosquito
Copy link
Owner

Don’t you want to make a PR?

@dvarrazzo
Copy link
Contributor Author

As I said, I can't reproduce it, and I don't know the project well enough to know if that's the best thing to do. If you think that my changeset is right, sure, I can make a PR.

dvarrazzo added a commit to dvarrazzo/aio-pika that referenced this issue Oct 16, 2023
… update

Fix for the crash reported in mosquito#589 unfortunately not tested to verify if
the fix is complete.
dvarrazzo added a commit to dvarrazzo/aio-pika that referenced this issue Oct 17, 2023
… update

Fix for the crash reported in mosquito#589 unfortunately not tested to verify if
the fix is complete.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants