diff --git a/requirements.txt b/requirements.txt index 7297129..90e1343 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ aiocsv==1.3.2 aiohttp==3.9.5 -aiomqtt==2.1.0 +aiomqtt==2.2.0 astral==3.2 ephem==4.1.5 influxdb-client==1.44.0 @@ -8,10 +8,10 @@ jsonref==1.1.0 loguru==0.7.2 numpy==2.0.0 pandas==2.2.2 -pydantic==2.7.4 +pydantic==2.8.2 pyjwt==2.8.0 requests==2.32.3 scikit-learn==1.4.2 -scipy==1.13.1 +scipy==1.14.0 solaredge_modbus==0.8.0 tzlocal==5.2 diff --git a/solaredge2mqtt/core/events/__init__.py b/solaredge2mqtt/core/events/__init__.py index e6e2cb6..cd28305 100644 --- a/solaredge2mqtt/core/events/__init__.py +++ b/solaredge2mqtt/core/events/__init__.py @@ -3,7 +3,7 @@ import asyncio from typing import Callable -from aiomqtt import MqttCodeError +from aiomqtt import MqttCodeError, MqttError from solaredge2mqtt.core.exceptions import InvalidDataException from solaredge2mqtt.core.logging import logger @@ -47,6 +47,12 @@ def unsubscribe(self, event: type[BaseEvent], listener: Callable) -> None: self._listeners.pop(event_key, None) self._subscribed_events.pop(event_key, None) + def unsubscribe_all(self, event: type[BaseEvent]) -> None: + event_key = event.event_key() + if event_key in self._listeners: + self._listeners.pop(event_key, None) + self._subscribed_events.pop(event_key, None) + async def emit(self, event: BaseEvent) -> None: try: event_key = event.event_key() @@ -61,6 +67,8 @@ async def emit(self, event: BaseEvent) -> None: ) self._tasks.add(task) task.add_done_callback(self._tasks.remove) + except MqttCodeError as error: + raise error except asyncio.CancelledError: pass diff --git a/solaredge2mqtt/core/mqtt/__init__.py b/solaredge2mqtt/core/mqtt/__init__.py index 9571463..d0187f8 100644 --- a/solaredge2mqtt/core/mqtt/__init__.py +++ b/solaredge2mqtt/core/mqtt/__init__.py @@ -41,6 +41,9 @@ def __init__(self, settings: MQTTSettings, event_bus: EventBus): ) def _subscribe_events(self) -> None: + self.event_bus.unsubscribe_all(MQTTPublishEvent) + self.event_bus.unsubscribe_all(MQTTSubscribeEvent) + self.event_bus.subscribe(MQTTPublishEvent, self.event_listener) self.event_bus.subscribe(MQTTSubscribeEvent, self._subscribe_topic) diff --git a/solaredge2mqtt/service.py b/solaredge2mqtt/service.py index def4861..6b91960 100644 --- a/solaredge2mqtt/service.py +++ b/solaredge2mqtt/service.py @@ -56,7 +56,7 @@ def __init__(self): self.event_bus = EventBus() self.timer = Timer(self.event_bus, self.settings.interval) - self.mqtt = MQTTClient(self.settings.mqtt, self.event_bus) + self.mqtt: MQTTClient | None = None self.cancel_request = aio.Event() self.loops: set[aio.Task] = set() @@ -124,6 +124,8 @@ async def main_loop(self): while not self.cancel_request.is_set(): try: + self.mqtt = MQTTClient(self.settings.mqtt, self.event_bus) + async with self.mqtt: await self.mqtt.publish_status_online() @@ -136,13 +138,14 @@ async def main_loop(self): await aio.gather(*self.loops) except MqttError: logger.error("MQTT error, reconnecting in 5 seconds...") - await aio.sleep(5) except aio.exceptions.CancelledError: logger.debug("Loops cancelled") - return finally: await self.finalize() + if not self.cancel_request.is_set(): + await aio.sleep(5) + async def finalize(self): try: await self.mqtt.publish_status_offline()