diff --git a/rstream/client.py b/rstream/client.py index 35c3134..be78a6c 100644 --- a/rstream/client.py +++ b/rstream/client.py @@ -249,6 +249,8 @@ async def run_queue_listener_task(self, subscriber_name: str, handler: HT[FT]) - async def stop_queue_listener_task(self, subscriber_name: str) -> None: await self.stop_task(name=f"run_delivery_handlers_{subscriber_name}") + while not self._frames[subscriber_name].empty(): + self._frames[subscriber_name].get_nowait() async def _run_delivery_handlers(self, subscriber_name: str, handler: HT[FT]): while self.is_connection_alive(): diff --git a/tests/test_consumer.py b/tests/test_consumer.py index df25263..cd19eeb 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -205,6 +205,50 @@ async def test_consumer_callback(stream: str, consumer: Consumer, producer: Prod assert offset >= 0 and offset < 100 +async def test_consumer_resubscribe_when_not_consumed_events_in_queue( + consumer: Consumer, producer: Producer +) -> None: + stream_name = "stream" + await producer.create_stream(stream=stream_name) + + processed_offsets_1: asyncio.Queue[int] = asyncio.Queue(1) + processed_offsets_2 = [] + + async def long_running_cb(message: AMQPMessage, message_context: MessageContext) -> None: + await processed_offsets_1.put(message_context.offset) + + async def write_processed_messages_cb(message: AMQPMessage, message_context: MessageContext) -> None: + processed_offsets_2.append(message_context.offset) + + for _ in range(10): + await producer.send_wait( + stream=stream_name, + message=b"msg", + ) + + try: + async with consumer: + subscriber_name = await consumer.subscribe( + stream=stream_name, callback=long_running_cb, initial_credit=10 + ) + await wait_for(lambda: processed_offsets_1.full()) + + await consumer.unsubscribe(subscriber_name) + + await consumer.subscribe( + stream=stream_name, + callback=write_processed_messages_cb, + initial_credit=10, + offset_specification=ConsumerOffsetSpecification(offset_type=OffsetType.OFFSET, offset=6), + ) + + await wait_for(lambda: len(processed_offsets_2) > 1) + assert processed_offsets_2[0] == 6 + finally: + await producer.delete_stream(stream_name) + await producer.close() + + async def test_offset_type_timestamp(stream: str, consumer: Consumer, producer: Producer) -> None: messages = [str(i).encode() for i in range(1, 5_000)] await producer.send_batch(stream, messages)