Skip to content

Commit

Permalink
Restructure MQTT Client usage to not reuse it.
Browse files Browse the repository at this point in the history
Signed-off-by: Johannes Ott <[email protected]>
  • Loading branch information
DerOetzi committed Jul 7, 2024
1 parent 3f51338 commit 120d637
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
10 changes: 9 additions & 1 deletion solaredge2mqtt/core/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import asyncio
from typing import Callable

from aiomqtt import MqttCodeError
from aiomqtt import MqttCodeError, MqttError

from solaredge2mqtt.core.exceptions import InvalidDataException
from solaredge2mqtt.core.logging import logger
Expand Down Expand Up @@ -47,6 +47,12 @@ def unsubscribe(self, event: type[BaseEvent], listener: Callable) -> None:
self._listeners.pop(event_key, None)
self._subscribed_events.pop(event_key, None)

def unsubscribe_all(self, event: type[BaseEvent]) -> None:
event_key = event.event_key()
if event_key in self._listeners:
self._listeners.pop(event_key, None)
self._subscribed_events.pop(event_key, None)

async def emit(self, event: BaseEvent) -> None:
try:
event_key = event.event_key()
Expand All @@ -61,6 +67,8 @@ async def emit(self, event: BaseEvent) -> None:
)
self._tasks.add(task)
task.add_done_callback(self._tasks.remove)
except MqttCodeError as error:
raise error
except asyncio.CancelledError:
pass

Expand Down
3 changes: 3 additions & 0 deletions solaredge2mqtt/core/mqtt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ def __init__(self, settings: MQTTSettings, event_bus: EventBus):
)

def _subscribe_events(self) -> None:
self.event_bus.unsubscribe_all(MQTTPublishEvent)
self.event_bus.unsubscribe_all(MQTTSubscribeEvent)

self.event_bus.subscribe(MQTTPublishEvent, self.event_listener)
self.event_bus.subscribe(MQTTSubscribeEvent, self._subscribe_topic)

Expand Down
9 changes: 6 additions & 3 deletions solaredge2mqtt/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def __init__(self):
self.event_bus = EventBus()
self.timer = Timer(self.event_bus, self.settings.interval)

self.mqtt = MQTTClient(self.settings.mqtt, self.event_bus)
self.mqtt: MQTTClient | None = None

self.cancel_request = aio.Event()
self.loops: set[aio.Task] = set()
Expand Down Expand Up @@ -124,6 +124,8 @@ async def main_loop(self):

while not self.cancel_request.is_set():
try:
self.mqtt = MQTTClient(self.settings.mqtt, self.event_bus)

async with self.mqtt:
await self.mqtt.publish_status_online()

Expand All @@ -136,13 +138,14 @@ async def main_loop(self):
await aio.gather(*self.loops)
except MqttError:
logger.error("MQTT error, reconnecting in 5 seconds...")
await aio.sleep(5)
except aio.exceptions.CancelledError:
logger.debug("Loops cancelled")
return
finally:
await self.finalize()

if not self.cancel_request.is_set():
await aio.sleep(5)

async def finalize(self):
try:
await self.mqtt.publish_status_offline()
Expand Down

0 comments on commit 120d637

Please sign in to comment.