diff --git a/rstream/consumer.py b/rstream/consumer.py index 3786601..32ade3f 100644 --- a/rstream/consumer.py +++ b/rstream/consumer.py @@ -319,6 +319,12 @@ async def unsubscribe(self, subscriber_name: str) -> None: except BaseException as exc: logger.warning("exception in unsubscribe of Consumer:" + str(exc)) + stream = subscriber.stream + + if stream in self._clients: + await self._clients[stream].remove_stream(stream) + await self._clients[stream].free_available_id(subscriber.subscription_id) + del self._subscribers[subscriber_name] async def query_offset(self, stream: str, subscriber_name: str) -> int: