Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impossible to handle Unexpected connection close #599

Open
tayp1n opened this issue Nov 15, 2023 · 9 comments
Open

Impossible to handle Unexpected connection close #599

tayp1n opened this issue Nov 15, 2023 · 9 comments

Comments

@tayp1n
Copy link

tayp1n commented Nov 15, 2023

How to handle Unexpected connection close and raise an exception?

I've got the logic below:

class AMQPHandler:
    def __init__(self) -> None:
        self.connection: AbstractRobustConnection | None = None
        self.channel: AbstractChannel | None = None

    async def init(self) -> None:
        import settings

        logger.info("Initializing AMQP handler")

        config = settings.CorePublisherSettings

        connection = await aio_pika.connect_robust(
            config.get_dsn(),
            loop=asyncio.get_event_loop(),
            timeout=config.CONNECTION_TIMEOUT,
        )
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)

        exchange = await channel.declare_exchange(
            config.EXCHANGE_NAME,
            config.EXCHANGE_TYPE,
            auto_delete=config.EXCHANGE_AUTO_DELETE,
            durable=True,
        )
        for key in config.BINDING_KEYS:
            q_name = (
                f"{key}.{config.PREFIX_BINDING_KEYS}"
                if config.PREFIX_BINDING_KEYS
                else key
            )
            queue = await channel.declare_queue(name=q_name)
            await queue.bind(exchange, q_name)
            await queue.consume(self.handle_message)
            logger.info("Queue declared", extra={"queue": q_name})

        self.connection = connection
        self.channel = channel

        logger.info("AMQP handler initialized")

Now, when Rabbit dropes, connection reconnects every 5 seconds. How to make it raise an exception which I could handle and stop a micro-service?

@Alviner
Copy link
Collaborator

Alviner commented Nov 15, 2023

You can use a regular connection instead of robust one.

Also event connection.connected can be checked in healthcheck handler

@tayp1n
Copy link
Author

tayp1n commented Nov 15, 2023

What I also did is

 def on_connection_closed(self, *args, **kwargs):
        sys.exit(1)
    async def init(self) -> None:
        import settings

        logger.info("Initializing AMQP handler")

        config = settings.BaseMessageBrokerSettings

        conn = await aiormq.connect(
            config.get_dsn(),
            loop=asyncio.get_event_loop(),
        )
        channel = await conn.channel()

        conn.closing.add_done_callback(self.on_connection_closed)
        channel.closing.add_done_callback(self.on_connection_closed)

        await channel.exchange_declare(
            exchange=config.EXCHANGE_NAME,
            exchange_type=config.EXCHANGE_TYPE,
            auto_delete=config.EXCHANGE_AUTO_DELETE,
            durable=True,
        )
        for key in config.BINDING_KEYS:
            q_name = (
                f"{key}.{config.PREFIX_BINDING_KEYS}"
                if config.PREFIX_BINDING_KEYS
                else key
            )
            queue = await channel.queue_declare(queue=q_name, durable=True)
            self.queues.append(queue)

            await channel.queue_bind(exchange=config.EXCHANGE_NAME, queue=q_name)
            await channel.basic_consume(queue.queue, self.handle_message, no_ack=True)
            logger.info("Queue declared", extra={"queue": q_name})

        self.conn = conn
        self.channel = channel

        logger.info("AMQP handler initialized")
       

@Dreamsorcerer
Copy link

Your original code uses connect_robust(), i.e. the function used to create a connection that automatically reconnects...
Sounds like you don't want to use that.

@gaby
Copy link

gaby commented Mar 23, 2024

I have the same issue. With connect_robust if the connection to RabbitMQ is lost, the consumer stops getting messages after re-established.

The logs show the consumer reconnecting every 5secs, Rabbit becomes reachable again, the logs stop and no new messages show up. In the UI RabbitMQ has 0 consumer for that queue.

I'm using the exact code from here (Master/Worker) but with a dict instead of task_id: https://aio-pika.readthedocs.io/en/latest/patterns.html

@mosquito Any idea?

I'm using the aio-pika 9.4.0 with Python 3.10.

@Darsstar
Copy link
Contributor

This might be fixed by #622 in 9.4.1. It sounds similar enough to what I encountered debugging #615.

@yaelmi3
Copy link

yaelmi3 commented Jul 11, 2024

We're currently using aio_pika-9.4.2 and encountered the same issue with connect_robust in our production env, as @gaby. Once RabbitMQ becomes reachable again, the client is stuck on await self.__connection_close_event.wait() .

@gaby
Copy link

gaby commented Jul 11, 2024

@yaelmi3 I fixed my issue using this #231 (comment)

@RomaLash
Copy link

@yaelmi3 I have the same issue, connection stuck after RabbitMQ becames available again.

I had code like

self.connection = await aio_pika.connect_robust(config.RABBIT_URI)

and then declaring a queue and etc.

I added timeout argument to connect_robust function, and it works!

self.connection = await aio_pika.connect_robust(config.RABBIT_URI, timeout=5)

Now after rabbit becames alive I have TimeourError (seems that reconnect task stucks, if rabbit becomes alive during the reconnection process). And the next reconnection try has a success.

@yaelmi3
Copy link

yaelmi3 commented Jul 14, 2024

@RomaLash : Actually I did try adding timeout to the connect_robust func, but this did not resolve my issue, in case of Unexpected connection close from remote. I assume this is because the the client is stuck on Waiting for connection close event, in case of abrupt server shutdown and then it being started again.

My solution was changing the code in the following manner, inspired by this from @gaby

Before:

        connection = await self._get_connection()
        queue = await self._get_queue(connection=connection, queue_name=queue_name,
                                      qos_prefetch_count=qos_prefetch_count, set_qos=set_qos)
        await queue.consume(partial(self._handle_message, message_handler, error_handler, fail_strategy))
        future = Future()
        future.add_done_callback(lambda _: connection.close())
        await future

After:

        connection = await self._get_connection()
        queue = await self._get_queue(connection=connection, queue_name=queue_name,
                                      qos_prefetch_count=qos_prefetch_count, set_qos=set_qos)
        await queue.consume(partial(self._handle_message, message_handler, error_handler, fail_strategy))
        try:
            await Future()
        finally:
            await queue.close()
            await connection.close()
  • in both cases, the connect_robust remained the same
    async def _get_connection(self):
        return await aio_pika.connect_robust(
            login=self.username,
            password=self.password,
            host=self.host,
            port=self.port,
            virtualhost=self.vhost,
            timeout=self.timeout
        )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants