diff --git a/solaredge2mqtt/core/events/__init__.py b/solaredge2mqtt/core/events/__init__.py index e6e2cb6..cd28305 100644 --- a/solaredge2mqtt/core/events/__init__.py +++ b/solaredge2mqtt/core/events/__init__.py @@ -3,7 +3,7 @@ import asyncio from typing import Callable -from aiomqtt import MqttCodeError +from aiomqtt import MqttCodeError, MqttError from solaredge2mqtt.core.exceptions import InvalidDataException from solaredge2mqtt.core.logging import logger @@ -47,6 +47,12 @@ def unsubscribe(self, event: type[BaseEvent], listener: Callable) -> None: self._listeners.pop(event_key, None) self._subscribed_events.pop(event_key, None) + def unsubscribe_all(self, event: type[BaseEvent]) -> None: + event_key = event.event_key() + if event_key in self._listeners: + self._listeners.pop(event_key, None) + self._subscribed_events.pop(event_key, None) + async def emit(self, event: BaseEvent) -> None: try: event_key = event.event_key() @@ -61,6 +67,8 @@ async def emit(self, event: BaseEvent) -> None: ) self._tasks.add(task) task.add_done_callback(self._tasks.remove) + except MqttCodeError as error: + raise error except asyncio.CancelledError: pass diff --git a/solaredge2mqtt/core/mqtt/__init__.py b/solaredge2mqtt/core/mqtt/__init__.py index 9571463..d0187f8 100644 --- a/solaredge2mqtt/core/mqtt/__init__.py +++ b/solaredge2mqtt/core/mqtt/__init__.py @@ -41,6 +41,9 @@ def __init__(self, settings: MQTTSettings, event_bus: EventBus): ) def _subscribe_events(self) -> None: + self.event_bus.unsubscribe_all(MQTTPublishEvent) + self.event_bus.unsubscribe_all(MQTTSubscribeEvent) + self.event_bus.subscribe(MQTTPublishEvent, self.event_listener) self.event_bus.subscribe(MQTTSubscribeEvent, self._subscribe_topic) diff --git a/solaredge2mqtt/service.py b/solaredge2mqtt/service.py index def4861..6b91960 100644 --- a/solaredge2mqtt/service.py +++ b/solaredge2mqtt/service.py @@ -56,7 +56,7 @@ def __init__(self): self.event_bus = EventBus() self.timer = Timer(self.event_bus, self.settings.interval) - self.mqtt = MQTTClient(self.settings.mqtt, self.event_bus) + self.mqtt: MQTTClient | None = None self.cancel_request = aio.Event() self.loops: set[aio.Task] = set() @@ -124,6 +124,8 @@ async def main_loop(self): while not self.cancel_request.is_set(): try: + self.mqtt = MQTTClient(self.settings.mqtt, self.event_bus) + async with self.mqtt: await self.mqtt.publish_status_online() @@ -136,13 +138,14 @@ async def main_loop(self): await aio.gather(*self.loops) except MqttError: logger.error("MQTT error, reconnecting in 5 seconds...") - await aio.sleep(5) except aio.exceptions.CancelledError: logger.debug("Loops cancelled") - return finally: await self.finalize() + if not self.cancel_request.is_set(): + await aio.sleep(5) + async def finalize(self): try: await self.mqtt.publish_status_offline()