Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer Needs Safely Closed #121

Closed
garvenlee opened this issue Sep 19, 2023 · 7 comments · Fixed by #131
Closed

Producer Needs Safely Closed #121

garvenlee opened this issue Sep 19, 2023 · 7 comments · Fixed by #131
Assignees
Labels
bug Something isn't working

Comments

@garvenlee
Copy link

garvenlee commented Sep 19, 2023

95ff58df66f3e03296f82d66409744ce
30007307b19de403d175920e927ae4fb
512c01f253e26c40f2e96102c45ba71d

If the RabbitMQ broker goes down, rstream library will trigger connection_closed callback, but producer.close will still call delete_publisher synchronously, thereby suppressing blocking/timeout and preventing the internal state from being cleaned up. This is what happens under the asyncio standard EventLoop, if you use uvloop, it will directly raise the RuntimeError in write_frame, which is not a socket.OSError

@Gsantomaggio
Copy link
Member

Thank you for reporting the issue @garvenlee
we will looking on it

@DanielePalaia DanielePalaia self-assigned this Sep 19, 2023
@DanielePalaia
Copy link
Collaborator

HI @garvenlee can you also copy the code here? thanks!

@garvenlee
Copy link
Author

from asyncio import Future
from rstream import RouteType, SuperStreamProducer, ConfirmationStatus
from rstream.amqp import _MessageProtocol

from google.protobuf.message import Message
from chat.proto.services.push.push_pb2 import PubEventToUser

SUPER_STREAM = "message_stream"
MESSAGES = 10000000


class PubEvent(_MessageProtocol):
    __slots__ = "data"

    def __init__(self, data: Message):
        self.data = data

    def __bytes__(self):
        return self.data.SerializeToString()


async def route_extractor(message: PubEvent):
    return str(message.data.uid)


def cb(confirmation: ConfirmationStatus):
    if not confirmation.is_confirmed:
        print(confirmation)


async def publish():
    async def on_closed(exc):
        print("triggered on_closed.", exc)
        nonlocal needs_pause
        if not needs_pause:
            needs_pause = True
            try:
                await super_stream_producer.close()
            except Exception as e:
                print(e)  # Timeout
            finally:
                closed_waiter.set_result(None)

    needs_pause = False
    super_stream_producer = SuperStreamProducer(
        "localhost",
        username="guest",
        password="guest",
        routing_extractor=route_extractor,
        routing=RouteType.Hash,
        super_stream=SUPER_STREAM,
        connection_closed_handler=on_closed,
    )
    await super_stream_producer.start()
    closed_waiter = Future()

    for i in range(MESSAGES):
        if needs_pause:
            break
        await super_stream_producer.send(
            PubEvent(
                PubEventToUser(channel_number=i, evt_data=bytes(f"hello, {i}", "ascii"))
            ),
            on_publish_confirm=cb,
        )
    await closed_waiter
    await Future()


if __name__ == "__main__":
    # import uvloop

    # uvloop.install()
    import asyncio

    asyncio.run(publish())

@Gsantomaggio
Copy link
Member

We are working on it.
Plus:

  • we will add more info on the on_closed like the stream source
  • we will provide a function to reconnect the producer to the partition

We don't want to provide the auto-reconnect feature on the Producer/Consumer and SuperStreamProducer/Consumer level. Maybe we can consider to extend the classes.

@garvenlee
Copy link
Author

garvenlee commented Sep 19, 2023

Additionally, SuperStreamConsumer creates multiple consumers internally, which leads to more default_client connections. If a client connection is broken and the other are healthy, we only know that the read task is abnormal when on_closed is triggered, but it is not easy to know which client in which consumer has a broken connection. It seems unreasonable to close the whole SuperStreamConsumer and reconnect.

@garvenlee
Copy link
Author

garvenlee commented Sep 20, 2023

Another:

In SuperStreamProducer and SuperStreamConsumer, connection_closed_handler is only attached on the default_client, so the send coroutine and the receive coroutine can’t detect in time that the underlaying connection was already broken , where they just still attempts to send or recv, especially consuming message from a replica.

@DanielePalaia
Copy link
Collaborator

HI @garvenlee yes thanks to have noticed the issue. Yeah I'm realizing the bug and I'm staring working on it here #131 with some other improvements

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
3 participants