From 337f3eeb12c066e56b5d8fa104dec1da2b21320d Mon Sep 17 00:00:00 2001 From: Daniele Date: Wed, 28 Aug 2024 10:23:47 +0200 Subject: [PATCH] add cleanup in unsubscribe (#199) * add cleanup in unsubscribe * cleanup subscriber_task in client during unsubscribing * adding unit test --------- Co-authored-by: Daniele --- pyproject.toml | 2 +- rstream/consumer.py | 6 ++++++ tests/test_consumer.py | 22 ++++++++++++++++++++++ 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d65f62e..8d98774 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "rstream" -version = "0.20.1" +version = "0.20.2" description = "A python client for RabbitMQ Streams" authors = ["George Fortunatov ", "Daniele Palaia "] readme = "README.md" diff --git a/rstream/consumer.py b/rstream/consumer.py index 3786601..32ade3f 100644 --- a/rstream/consumer.py +++ b/rstream/consumer.py @@ -319,6 +319,12 @@ async def unsubscribe(self, subscriber_name: str) -> None: except BaseException as exc: logger.warning("exception in unsubscribe of Consumer:" + str(exc)) + stream = subscriber.stream + + if stream in self._clients: + await self._clients[stream].remove_stream(stream) + await self._clients[stream].free_available_id(subscriber.subscription_id) + del self._subscribers[subscriber_name] async def query_offset(self, stream: str, subscriber_name: str) -> int: diff --git a/tests/test_consumer.py b/tests/test_consumer.py index d31fe3d..3d81b33 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -261,6 +261,28 @@ async def test_consume_with_resubscribe(stream: str, consumer: Consumer, produce assert captured == [b"one", b"two"] +async def test_consume_with_resubscribe_msg(stream: str, consumer: Consumer, producer: Producer) -> None: + captured: list[bytes] = [] + subscriber_name = await consumer.subscribe( + stream, callback=lambda message, message_context: captured.append(bytes(message)) + ) + for i in range(100): + await producer.send_wait(stream, b"one") + await wait_for(lambda: len(captured) >= 100) + + await consumer.unsubscribe(subscriber_name) + await consumer.subscribe( + stream, + subscriber_name=subscriber_name, + callback=lambda message, message_context: captured.append(bytes(message)), + offset_specification=ConsumerOffsetSpecification(OffsetType.NEXT, None), + ) + + for i in range(100): + await producer.send_wait(stream, b"two") + await wait_for(lambda: len(captured) >= 200) + + async def test_consume_superstream_with_resubscribe( super_stream: str, super_stream_consumer: SuperStreamConsumer, super_stream_producer: SuperStreamProducer ) -> None: