Skip to content

Commit

Permalink
feat: syncronize values with timestamps
Browse files Browse the repository at this point in the history
It seems that the Nexa bridge REST API lags behind in quite a few cases
which can makes values jump back and forth a bit when combined with
websocket messages.

This adds support for comparing the dates of data to ignore potentially
old data.
  • Loading branch information
andersevenrud committed Jan 3, 2023
1 parent 241cb10 commit 758470e
Showing 1 changed file with 53 additions and 7 deletions.
60 changes: 53 additions & 7 deletions custom_components/nexa_bridge_x/nexa.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
POLL_TIMEOUT,
RECONNECT_SLEEP
)
import dateutil.parser
import asyncio
import aiohttp
import json
Expand All @@ -44,11 +45,17 @@
NexaWebsocketData = Any


def is_capable_of(node: NexaNode, items: list(str)):
def is_capable_of(node: NexaNode, items: list(str)) -> bool:
"""Check if given capability is available"""
return any(cap for cap in items if cap in node.capabilities)


def is_newer_date(current: str, new: str) -> bool:
current_time = dateutil.parser.isoparse(current)
new_time = dateutil.parser.isoparse(new)
return new_time >= current_time


class NexaApiError(Exception):
"""Base error"""

Expand Down Expand Up @@ -137,7 +144,6 @@ async def on_message(self, msg: NexaWebsocketMessage) -> None:
msg = msg.split(':', 1)[1]

try:
_LOGGER.debug("Websocket message: %s", msg)
data = json.loads(msg)
except Exception as err:
_LOGGER.warning("Invalid websocket message (%s): %s", msg, err)
Expand Down Expand Up @@ -415,11 +421,34 @@ def get_sensor_capabilities(self) -> list[str]:
self.capabilities
))

def set_value(self, name: str, new_value: NexaNodeValueType) -> None:
def set_values_from_node(self, node: NexaNode) -> None:
for new_value in node.values:
new_time = new_value.time
for current_value in self.values:
current_time = current_value.time
if current_value.name == new_value.name:
if is_newer_date(current_time, new_time):
current_value.value = new_value.value
_LOGGER.debug("Patching node %s from node: %s = %s", self.id, current_value.name, new_value.value)
else:
_LOGGER.debug("Not Patching node %s from node - outdated", self.id)
break

def set_value(
self,
name: str,
new_value: NexaNodeValueType,
new_time: str
) -> None:
"""Set current state value"""
for value in self.values:
if value.name == name:
value.value = new_value
if is_newer_date(value.time, new_time):
value.value = new_value
value.time = new_time
_LOGGER.debug("Patching node %s with value: %s = %s", self.id, name, new_value)
else:
_LOGGER.debug("Not Patching node %s from value - outdated", self.id)
break

def get_value(self, name: str) -> NexaNodeValueType | None:
Expand Down Expand Up @@ -484,6 +513,15 @@ def get_node_by_id(self, node_id: str) -> NexaNode | None:
return node
return None

def update_nodes_from_data(self, data: NexaData):
self.data.info = data.info
self.data.energy = data.energy

for node in data.nodes:
current_node = self.get_node_by_id(node.id)
if current_node:
current_node.set_values_from_node(node)

async def update_node_from_message(self, data: NexaWebsocketData) -> None:
"""Try to update a node based on message"""
if not self.data:
Expand All @@ -497,12 +535,14 @@ async def update_node_from_message(self, data: NexaWebsocketData) -> None:
node_id: str = data["sourceNode"]
if node_id and str(node_id) != "-1":
value: NexaNodeValueType = data["value"]
time: NexaNodeValueType = data["time"]
cap: str = data["capability"]

_LOGGER.debug("Coordinator update message: %s", data)

node = self.get_node_by_id(node_id)
if node:
_LOGGER.debug("Updating node %s from: %s", node_id, value)
node.set_value(cap, value)
node.set_value(cap, value, time)
self.async_set_updated_data(self.data)

async def _async_update_data(self) -> None:
Expand All @@ -518,11 +558,17 @@ async def _async_update_data(self) -> None:

(info, nodes, energy, energy_nodes) = results

return NexaData(
data = NexaData(
NexaInfo(info),
list(map(lambda n: NexaNode(n), nodes)),
NexaEnergy(energy, energy_nodes)
)

if self.data:
self.update_nodes_from_data(data)
return self.data

return data
except NexaApiAuthorizationError as err:
raise ConfigEntryAuthFailed from err
except NexaApiError as err:
Expand Down

0 comments on commit 758470e

Please sign in to comment.