Skip to content

Commit

Permalink
Merge pull request #92 from DerOetzi/mqtt-reconnect
Browse files Browse the repository at this point in the history
Mqtt reconnect
  • Loading branch information
DerOetzi authored Jul 7, 2024
2 parents 3f51338 + 0b4eac9 commit 82da894
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 7 deletions.
6 changes: 3 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
aiocsv==1.3.2
aiohttp==3.9.5
aiomqtt==2.1.0
aiomqtt==2.2.0
astral==3.2
ephem==4.1.5
influxdb-client==1.44.0
jsonref==1.1.0
loguru==0.7.2
numpy==2.0.0
pandas==2.2.2
pydantic==2.7.4
pydantic==2.8.2
pyjwt==2.8.0
requests==2.32.3
scikit-learn==1.4.2
scipy==1.13.1
scipy==1.14.0
solaredge_modbus==0.8.0
tzlocal==5.2
10 changes: 9 additions & 1 deletion solaredge2mqtt/core/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import asyncio
from typing import Callable

from aiomqtt import MqttCodeError
from aiomqtt import MqttCodeError, MqttError

from solaredge2mqtt.core.exceptions import InvalidDataException
from solaredge2mqtt.core.logging import logger
Expand Down Expand Up @@ -47,6 +47,12 @@ def unsubscribe(self, event: type[BaseEvent], listener: Callable) -> None:
self._listeners.pop(event_key, None)
self._subscribed_events.pop(event_key, None)

def unsubscribe_all(self, event: type[BaseEvent]) -> None:
event_key = event.event_key()
if event_key in self._listeners:
self._listeners.pop(event_key, None)
self._subscribed_events.pop(event_key, None)

async def emit(self, event: BaseEvent) -> None:
try:
event_key = event.event_key()
Expand All @@ -61,6 +67,8 @@ async def emit(self, event: BaseEvent) -> None:
)
self._tasks.add(task)
task.add_done_callback(self._tasks.remove)
except MqttCodeError as error:
raise error
except asyncio.CancelledError:
pass

Expand Down
3 changes: 3 additions & 0 deletions solaredge2mqtt/core/mqtt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ def __init__(self, settings: MQTTSettings, event_bus: EventBus):
)

def _subscribe_events(self) -> None:
self.event_bus.unsubscribe_all(MQTTPublishEvent)
self.event_bus.unsubscribe_all(MQTTSubscribeEvent)

self.event_bus.subscribe(MQTTPublishEvent, self.event_listener)
self.event_bus.subscribe(MQTTSubscribeEvent, self._subscribe_topic)

Expand Down
9 changes: 6 additions & 3 deletions solaredge2mqtt/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def __init__(self):
self.event_bus = EventBus()
self.timer = Timer(self.event_bus, self.settings.interval)

self.mqtt = MQTTClient(self.settings.mqtt, self.event_bus)
self.mqtt: MQTTClient | None = None

self.cancel_request = aio.Event()
self.loops: set[aio.Task] = set()
Expand Down Expand Up @@ -124,6 +124,8 @@ async def main_loop(self):

while not self.cancel_request.is_set():
try:
self.mqtt = MQTTClient(self.settings.mqtt, self.event_bus)

async with self.mqtt:
await self.mqtt.publish_status_online()

Expand All @@ -136,13 +138,14 @@ async def main_loop(self):
await aio.gather(*self.loops)
except MqttError:
logger.error("MQTT error, reconnecting in 5 seconds...")
await aio.sleep(5)
except aio.exceptions.CancelledError:
logger.debug("Loops cancelled")
return
finally:
await self.finalize()

if not self.cancel_request.is_set():
await aio.sleep(5)

async def finalize(self):
try:
await self.mqtt.publish_status_offline()
Expand Down

0 comments on commit 82da894

Please sign in to comment.