From c9da9077820e5f0b0fb0080f0363a85364883a69 Mon Sep 17 00:00:00 2001 From: Bulygin Evgeny <41860443+nesb1@users.noreply.github.com> Date: Thu, 10 Oct 2024 12:03:46 +0500 Subject: [PATCH] Fixed a bug when used `consumer_update_listener` callback from another subscription in single active consumer mode. (#210) --- rstream/consumer.py | 3 ++ tests/test_consumer.py | 65 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/rstream/consumer.py b/rstream/consumer.py index 8523c1d..5f46aba 100644 --- a/rstream/consumer.py +++ b/rstream/consumer.py @@ -414,6 +414,9 @@ async def _on_consumer_update_query_response( Callable[[bool, EventContext], Awaitable[OffsetSpecification]] ] = None, ) -> None: + if frame.subscription_id != subscriber.subscription_id: + return + # event the consumer is not active, we need to send a ConsumerUpdateResponse # by protocol definition. the offsetType can't be null so we use OffsetTypeNext as default if consumer_update_listener is None: diff --git a/tests/test_consumer.py b/tests/test_consumer.py index cd19eeb..ca2c44b 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -444,6 +444,71 @@ async def consumer_update_listener_with_custom_offset( await producer.close() +async def test_consume_with_multiple_sac_custom_consumer_update_listener_cb( + consumer: Consumer, producer: Producer +) -> None: + stream_name_1 = "stream1" + stream_name_2 = "stream2" + await producer.create_stream(stream=stream_name_1) + await producer.create_stream(stream=stream_name_2) + try: + # necessary to use send_wait here, because rmq will store every message in separate batch. + # In case of use send_batch rstream will filter messages on the client side bypassing some problems. + for i in range(10): + await producer.send_wait(stream_name_1, AMQPMessage(body=f"{i}".encode())) + await producer.send_wait(stream_name_2, AMQPMessage(body=f"{i}".encode())) + + received_offsets_1 = [] + received_offsets_2 = [] + + async def consumer_cb1(message: bytes, message_context: MessageContext) -> None: + received_offsets_1.append(message_context.offset) + + async def consumer_cb2(message: bytes, message_context: MessageContext) -> None: + received_offsets_2.append(message_context.offset) + + async def consumer_update_listener_with_custom_offset_1( + is_active: bool, event_context: EventContext + ) -> OffsetSpecification: + if is_active: + return OffsetSpecification(offset_type=OffsetType.OFFSET, offset=5) + return OffsetSpecification(offset_type=OffsetType.FIRST, offset=0) + + async def consumer_update_listener_with_custom_offset_2( + is_active: bool, event_context: EventContext + ) -> OffsetSpecification: + if is_active: + return OffsetSpecification(offset_type=OffsetType.OFFSET, offset=7) + return OffsetSpecification(offset_type=OffsetType.FIRST, offset=0) + + async with consumer: + await consumer.subscribe( + stream=stream_name_1, + callback=consumer_cb1, + properties={"single-active-consumer": "true", "name": "sac_name1"}, + offset_specification=ConsumerOffsetSpecification(OffsetType.FIRST), + consumer_update_listener=consumer_update_listener_with_custom_offset_1, + ) + await consumer.subscribe( + stream=stream_name_2, + callback=consumer_cb2, + properties={"single-active-consumer": "true", "name": "sac_name2"}, + offset_specification=ConsumerOffsetSpecification(OffsetType.FIRST), + consumer_update_listener=consumer_update_listener_with_custom_offset_2, + ) + + await wait_for(lambda: len(received_offsets_1) >= 1) + await wait_for(lambda: len(received_offsets_2) >= 1) + + assert received_offsets_1[0] == 5 + assert received_offsets_2[0] == 7 + + finally: + await producer.delete_stream(stream=stream_name_1) + await producer.delete_stream(stream=stream_name_2) + await producer.close() + + async def test_consume_superstream_with_sac_all_active( super_stream: str, super_stream_consumer_for_sac1: SuperStreamConsumer,