Skip to content

Commit

Permalink
Fixed a bug when subscriber frames not clears after unsubscribe (#211)
Browse files Browse the repository at this point in the history
  • Loading branch information
nesb1 authored Oct 10, 2024
1 parent 85c5b96 commit a338868
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 0 deletions.
2 changes: 2 additions & 0 deletions rstream/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ async def run_queue_listener_task(self, subscriber_name: str, handler: HT[FT]) -

async def stop_queue_listener_task(self, subscriber_name: str) -> None:
await self.stop_task(name=f"run_delivery_handlers_{subscriber_name}")
while not self._frames[subscriber_name].empty():
self._frames[subscriber_name].get_nowait()

async def _run_delivery_handlers(self, subscriber_name: str, handler: HT[FT]):
while self.is_connection_alive():
Expand Down
44 changes: 44 additions & 0 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,50 @@ async def test_consumer_callback(stream: str, consumer: Consumer, producer: Prod
assert offset >= 0 and offset < 100


async def test_consumer_resubscribe_when_not_consumed_events_in_queue(
consumer: Consumer, producer: Producer
) -> None:
stream_name = "stream"
await producer.create_stream(stream=stream_name)

processed_offsets_1: asyncio.Queue[int] = asyncio.Queue(1)
processed_offsets_2 = []

async def long_running_cb(message: AMQPMessage, message_context: MessageContext) -> None:
await processed_offsets_1.put(message_context.offset)

async def write_processed_messages_cb(message: AMQPMessage, message_context: MessageContext) -> None:
processed_offsets_2.append(message_context.offset)

for _ in range(10):
await producer.send_wait(
stream=stream_name,
message=b"msg",
)

try:
async with consumer:
subscriber_name = await consumer.subscribe(
stream=stream_name, callback=long_running_cb, initial_credit=10
)
await wait_for(lambda: processed_offsets_1.full())

await consumer.unsubscribe(subscriber_name)

await consumer.subscribe(
stream=stream_name,
callback=write_processed_messages_cb,
initial_credit=10,
offset_specification=ConsumerOffsetSpecification(offset_type=OffsetType.OFFSET, offset=6),
)

await wait_for(lambda: len(processed_offsets_2) > 1)
assert processed_offsets_2[0] == 6
finally:
await producer.delete_stream(stream_name)
await producer.close()


async def test_offset_type_timestamp(stream: str, consumer: Consumer, producer: Producer) -> None:
messages = [str(i).encode() for i in range(1, 5_000)]
await producer.send_batch(stream, messages)
Expand Down

0 comments on commit a338868

Please sign in to comment.