Skip to content

Commit

Permalink
cleanup subscriber_task in client during unsubscribing
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniele authored and Daniele committed Aug 27, 2024
1 parent da75e53 commit 3d7a4f8
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 0 deletions.
4 changes: 4 additions & 0 deletions rstream/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ async def run_queue_listener_task(self, subscriber_name: str, handler: HT[FT]) -
self._run_delivery_handlers(subscriber_name, handler),
)

async def destroy_queue_listener_task(self, subscriber_name: str) -> None:
if subscriber_name in self._frames:
self.stop_task(f"run_delivery_handlers_{subscriber_name}")

async def _run_delivery_handlers(self, subscriber_name: str, handler: HT[FT]):
while self.is_connection_alive():
frame_entry = await self._frames[subscriber_name].get()
Expand Down
2 changes: 2 additions & 0 deletions rstream/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ async def unsubscribe(self, subscriber_name: str) -> None:
await self._clients[stream].remove_stream(stream)
await self._clients[stream].free_available_id(subscriber.subscription_id)

subscriber.client.destroy_queue_listener_task(subscriber_name)

del self._subscribers[subscriber_name]

async def query_offset(self, stream: str, subscriber_name: str) -> int:
Expand Down

0 comments on commit 3d7a4f8

Please sign in to comment.