Skip to content

Commit

Permalink
add cleanup in unsubscribe (#199)
Browse files Browse the repository at this point in the history
* add cleanup in unsubscribe

* cleanup subscriber_task in client during unsubscribing

* adding unit test

---------

Co-authored-by: Daniele <[email protected]>
  • Loading branch information
DanielePalaia and Daniele authored Aug 28, 2024
1 parent 619006f commit 337f3ee
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "rstream"
version = "0.20.1"
version = "0.20.2"
description = "A python client for RabbitMQ Streams"
authors = ["George Fortunatov <[email protected]>", "Daniele Palaia <[email protected]>"]
readme = "README.md"
Expand Down
6 changes: 6 additions & 0 deletions rstream/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,12 @@ async def unsubscribe(self, subscriber_name: str) -> None:
except BaseException as exc:
logger.warning("exception in unsubscribe of Consumer:" + str(exc))

stream = subscriber.stream

if stream in self._clients:
await self._clients[stream].remove_stream(stream)
await self._clients[stream].free_available_id(subscriber.subscription_id)

del self._subscribers[subscriber_name]

async def query_offset(self, stream: str, subscriber_name: str) -> int:
Expand Down
22 changes: 22 additions & 0 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,28 @@ async def test_consume_with_resubscribe(stream: str, consumer: Consumer, produce
assert captured == [b"one", b"two"]


async def test_consume_with_resubscribe_msg(stream: str, consumer: Consumer, producer: Producer) -> None:
captured: list[bytes] = []
subscriber_name = await consumer.subscribe(
stream, callback=lambda message, message_context: captured.append(bytes(message))
)
for i in range(100):
await producer.send_wait(stream, b"one")
await wait_for(lambda: len(captured) >= 100)

await consumer.unsubscribe(subscriber_name)
await consumer.subscribe(
stream,
subscriber_name=subscriber_name,
callback=lambda message, message_context: captured.append(bytes(message)),
offset_specification=ConsumerOffsetSpecification(OffsetType.NEXT, None),
)

for i in range(100):
await producer.send_wait(stream, b"two")
await wait_for(lambda: len(captured) >= 200)


async def test_consume_superstream_with_resubscribe(
super_stream: str, super_stream_consumer: SuperStreamConsumer, super_stream_producer: SuperStreamProducer
) -> None:
Expand Down

0 comments on commit 337f3ee

Please sign in to comment.