Skip to content

Commit

Permalink
adding unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielePalaia committed Aug 27, 2024
1 parent 3d7a4f8 commit 77fdbbc
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 6 deletions.
4 changes: 0 additions & 4 deletions rstream/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,6 @@ async def run_queue_listener_task(self, subscriber_name: str, handler: HT[FT]) -
self._run_delivery_handlers(subscriber_name, handler),
)

async def destroy_queue_listener_task(self, subscriber_name: str) -> None:
if subscriber_name in self._frames:
self.stop_task(f"run_delivery_handlers_{subscriber_name}")

async def _run_delivery_handlers(self, subscriber_name: str, handler: HT[FT]):
while self.is_connection_alive():
frame_entry = await self._frames[subscriber_name].get()
Expand Down
2 changes: 0 additions & 2 deletions rstream/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,6 @@ async def unsubscribe(self, subscriber_name: str) -> None:
await self._clients[stream].remove_stream(stream)
await self._clients[stream].free_available_id(subscriber.subscription_id)

subscriber.client.destroy_queue_listener_task(subscriber_name)

del self._subscribers[subscriber_name]

async def query_offset(self, stream: str, subscriber_name: str) -> int:
Expand Down
22 changes: 22 additions & 0 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,28 @@ async def test_consume_with_resubscribe(stream: str, consumer: Consumer, produce
assert captured == [b"one", b"two"]


async def test_consume_with_resubscribe_msg(stream: str, consumer: Consumer, producer: Producer) -> None:
captured: list[bytes] = []
subscriber_name = await consumer.subscribe(
stream, callback=lambda message, message_context: captured.append(bytes(message))
)
for i in range(100):
await producer.send_wait(stream, b"one")
await wait_for(lambda: len(captured) >= 100)

await consumer.unsubscribe(subscriber_name)
await consumer.subscribe(
stream,
subscriber_name=subscriber_name,
callback=lambda message, message_context: captured.append(bytes(message)),
offset_specification=ConsumerOffsetSpecification(OffsetType.NEXT, None),
)

for i in range(100):
await producer.send_wait(stream, b"two")
await wait_for(lambda: len(captured) >= 200)


async def test_consume_superstream_with_resubscribe(
super_stream: str, super_stream_consumer: SuperStreamConsumer, super_stream_producer: SuperStreamProducer
) -> None:
Expand Down

0 comments on commit 77fdbbc

Please sign in to comment.