diff --git a/rstream/client.py b/rstream/client.py index 807a3d4..976507a 100644 --- a/rstream/client.py +++ b/rstream/client.py @@ -246,10 +246,6 @@ async def run_queue_listener_task(self, subscriber_name: str, handler: HT[FT]) - self._run_delivery_handlers(subscriber_name, handler), ) - async def destroy_queue_listener_task(self, subscriber_name: str) -> None: - if subscriber_name in self._frames: - self.stop_task(f"run_delivery_handlers_{subscriber_name}") - async def _run_delivery_handlers(self, subscriber_name: str, handler: HT[FT]): while self.is_connection_alive(): frame_entry = await self._frames[subscriber_name].get() diff --git a/rstream/consumer.py b/rstream/consumer.py index 591718f..32ade3f 100644 --- a/rstream/consumer.py +++ b/rstream/consumer.py @@ -325,8 +325,6 @@ async def unsubscribe(self, subscriber_name: str) -> None: await self._clients[stream].remove_stream(stream) await self._clients[stream].free_available_id(subscriber.subscription_id) - subscriber.client.destroy_queue_listener_task(subscriber_name) - del self._subscribers[subscriber_name] async def query_offset(self, stream: str, subscriber_name: str) -> int: diff --git a/tests/test_consumer.py b/tests/test_consumer.py index d31fe3d..3d81b33 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -261,6 +261,28 @@ async def test_consume_with_resubscribe(stream: str, consumer: Consumer, produce assert captured == [b"one", b"two"] +async def test_consume_with_resubscribe_msg(stream: str, consumer: Consumer, producer: Producer) -> None: + captured: list[bytes] = [] + subscriber_name = await consumer.subscribe( + stream, callback=lambda message, message_context: captured.append(bytes(message)) + ) + for i in range(100): + await producer.send_wait(stream, b"one") + await wait_for(lambda: len(captured) >= 100) + + await consumer.unsubscribe(subscriber_name) + await consumer.subscribe( + stream, + subscriber_name=subscriber_name, + callback=lambda message, message_context: captured.append(bytes(message)), + offset_specification=ConsumerOffsetSpecification(OffsetType.NEXT, None), + ) + + for i in range(100): + await producer.send_wait(stream, b"two") + await wait_for(lambda: len(captured) >= 200) + + async def test_consume_superstream_with_resubscribe( super_stream: str, super_stream_consumer: SuperStreamConsumer, super_stream_producer: SuperStreamProducer ) -> None: