Skip to content

Commit

Permalink
Fixed a bug when, after unsubscribing, the next subscriber with the s…
Browse files Browse the repository at this point in the history
…ame subscriber name used a first subscriber callback and ignore second subscriber callback. (#204)

* Actualized build and test section at README.md like ci.

* Fixed a bug when, after unsubscribing, the next subscriber with the same subscriber name used a first subscriber callback and ignore second subscriber callback.
  • Loading branch information
nesb1 authored Sep 18, 2024
1 parent 0999ee1 commit 0407272
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 22 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,12 @@ Run the server with the following command:
```bash
docker run -it --rm --name rabbitmq -p 5552:5552 -p 5672:5672 -p 15672:15672 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.12-management
rabbitmq:3.13.1-management
```

enable the plugin:
```bash
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management rabbitmq_amqp1_0
```

and run the tests:
Expand Down
10 changes: 7 additions & 3 deletions rstream/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,16 @@ async def start(self) -> None:
self.add_handler(schema.Close, self._on_close)

async def run_queue_listener_task(self, subscriber_name: str, handler: HT[FT]) -> None:
if subscriber_name not in self._frames:
task_name = f"run_delivery_handlers_{subscriber_name}"
if task_name not in self._tasks:
self.start_task(
f"run_delivery_handlers_{subscriber_name}",
task_name,
self._run_delivery_handlers(subscriber_name, handler),
)

async def stop_queue_listener_task(self, subscriber_name: str) -> None:
await self.stop_task(name=f"run_delivery_handlers_{subscriber_name}")

async def _run_delivery_handlers(self, subscriber_name: str, handler: HT[FT]):
while self.is_connection_alive():
frame_entry = await self._frames[subscriber_name].get()
Expand Down Expand Up @@ -361,7 +365,7 @@ async def close(self) -> None:
await self.stop_task("listener")

for subscriber_name in self._frames:
await self.stop_task(f"run_delivery_handlers_{subscriber_name}")
await self.stop_queue_listener_task(subscriber_name=subscriber_name)

if self._conn is not None and connection_is_broken is False:
await self._conn.close()
Expand Down
2 changes: 2 additions & 0 deletions rstream/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ async def subscribe(
async def unsubscribe(self, subscriber_name: str) -> None:
logger.debug("unsubscribe(): UnSubscribing and removing handlers")
subscriber = self._subscribers[subscriber_name]

await subscriber.client.stop_queue_listener_task(subscriber_name=subscriber_name)
subscriber.client.remove_handler(
schema.Deliver,
name=subscriber.reference,
Expand Down
42 changes: 25 additions & 17 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,66 +242,74 @@ async def test_offset_type_next(stream: str, consumer: Consumer, producer: Produ


async def test_consume_with_resubscribe(stream: str, consumer: Consumer, producer: Producer) -> None:
captured: list[bytes] = []
captured_by_first_consumer: list[bytes] = []
subscriber_name = await consumer.subscribe(
stream, callback=lambda message, message_context: captured.append(bytes(message))
stream, callback=lambda message, message_context: captured_by_first_consumer.append(bytes(message))
)
await producer.send_wait(stream, b"one")
await wait_for(lambda: len(captured) >= 1)
await wait_for(lambda: len(captured_by_first_consumer) >= 1)
assert captured_by_first_consumer == [b"one"]

await consumer.unsubscribe(subscriber_name)

captured_by_second_consumer: list[bytes] = []
await consumer.subscribe(
stream,
callback=lambda message, message_context: captured.append(bytes(message)),
callback=lambda message, message_context: captured_by_second_consumer.append(bytes(message)),
offset_specification=ConsumerOffsetSpecification(OffsetType.NEXT, None),
)

await producer.send_wait(stream, b"two")
await wait_for(lambda: len(captured) >= 2)
assert captured == [b"one", b"two"]
await asyncio.sleep(1)
await wait_for(lambda: len(captured_by_second_consumer) >= 1)
assert captured_by_second_consumer == [b"two"]


async def test_consume_with_resubscribe_msg(stream: str, consumer: Consumer, producer: Producer) -> None:
captured: list[bytes] = []
captured_by_first_consumer: list[bytes] = []
subscriber_name = await consumer.subscribe(
stream, callback=lambda message, message_context: captured.append(bytes(message))
stream, callback=lambda message, message_context: captured_by_first_consumer.append(bytes(message))
)
for i in range(100):
await producer.send_wait(stream, b"one")
await wait_for(lambda: len(captured) >= 100)
await wait_for(lambda: len(captured_by_first_consumer) >= 100)

await consumer.unsubscribe(subscriber_name)

captured_by_second_consumer: list[bytes] = []
await consumer.subscribe(
stream,
subscriber_name=subscriber_name,
callback=lambda message, message_context: captured.append(bytes(message)),
callback=lambda message, message_context: captured_by_second_consumer.append(bytes(message)),
offset_specification=ConsumerOffsetSpecification(OffsetType.NEXT, None),
)

for i in range(100):
await producer.send_wait(stream, b"two")
await wait_for(lambda: len(captured) >= 200)
await wait_for(lambda: len(captured_by_second_consumer) >= 100)


async def test_consume_superstream_with_resubscribe(
super_stream: str, super_stream_consumer: SuperStreamConsumer, super_stream_producer: SuperStreamProducer
) -> None:
captured: list[bytes] = []
captured_by_first_consumer: list[bytes] = []
await super_stream_consumer.subscribe(
callback=lambda message, message_context: captured.append(bytes(message))
callback=lambda message, message_context: captured_by_first_consumer.append(bytes(message))
)
await super_stream_producer.send(b"one")
await wait_for(lambda: len(captured) >= 1)
await wait_for(lambda: len(captured_by_first_consumer) >= 1)

await super_stream_consumer.unsubscribe()

captured_by_second_consumer: list[bytes] = []
await super_stream_consumer.subscribe(
callback=lambda message, message_context: captured.append(bytes(message)),
callback=lambda message, message_context: captured_by_second_consumer.append(bytes(message)),
offset_specification=ConsumerOffsetSpecification(OffsetType.NEXT, None),
)

await super_stream_producer.send(b"two")
await wait_for(lambda: len(captured) >= 2)
assert captured == [b"one", b"two"]
await wait_for(lambda: len(captured_by_second_consumer) >= 1)
assert captured_by_second_consumer == [b"two"]


async def test_consume_with_restart(stream: str, consumer: Consumer, producer: Producer) -> None:
Expand Down

0 comments on commit 0407272

Please sign in to comment.