Skip to content

Commit

Permalink
Merge pull request #8 from hostcc/bugfix/online-sensor
Browse files Browse the repository at this point in the history
Fixed processing for online sensor
  • Loading branch information
hostcc authored Aug 28, 2022
2 parents f048791 + 101f039 commit 47c9bad
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 16 deletions.
12 changes: 11 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ Configuration file is in YAML format and supports following elements:
# exit
oneshot:
# (number) default ``30``: delay in seconds between processing
# cycles
# cycles. Is also used as MQTT keepalive interval upon which the
# broker will consider the client disconnected if no response
intercycle_delay:
# (string) default ``error``: logging level, one of ``critical``,
# ``error``, ``warning``, ``info`` or ``debug``
Expand All @@ -89,6 +90,8 @@ Configuration file is in YAML format and supports following elements:
# (string) default ``homeassistant``: Preffix to MQTT topic names,
# should correspond to one set in HomeAssistant for auto-discovery
hass_discovery_prefix:
# (bool) default ``true``: Whether to enable TLS with MQTT broker
tls:
# (list of mappings) - optional: Energy meter parameters to process and
# properties of corresponding HASS sensor
parameters:
Expand Down Expand Up @@ -140,6 +143,13 @@ There are Docker images available if you would like to run it as Docker containe

As of writing, the images are built to ARM v6/v7 and ARM64 platforms.

.. note::

For ARMv6 you might need to specify image variant explicitly, in case the
container engine detects it incorrectly and resulting image doesn't run as
expected. To do that just add ``--variant v6`` to ``pull`` command


To run the program as container you will need to create a directory on the host
and put ``config.yaml`` relevant to your setup there.

Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ pythonpath = [
log_cli = 1
log_cli_level = "error"

markers = [
"mqtt_broker_users",
]

[tool.pylint.main]
load-plugins = "pylint.extensions.no_self_use"

Expand Down
28 changes: 24 additions & 4 deletions src/energomera_hass_mqtt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,14 @@ def __init__(

self._mqtt_client = MqttClient(
hostname=config.of.mqtt.host, username=config.of.mqtt.user,
password=config.of.mqtt.password, tls_context=mqtt_tls_context
password=config.of.mqtt.password, tls_context=mqtt_tls_context,
# Set MQTT keepalive to interval between meter interaction cycles,
# so that MQTT broker will consider the client disconnected upon
# that internal if no MQTT traffic is seen from the client.
# Please note the interval could be shorter due to the use of
# `asyncio_mqtt`, since its asynchrounous task runs in the loop and
# can respond to MQTT pings anytime in between meter cycles.
keepalive=config.of.general.intercycle_delay,
)
self._hass_discovery_prefix = config.of.mqtt.hass_discovery_prefix
self._model = None
Expand Down Expand Up @@ -499,6 +506,9 @@ async def iec_read_admin(self):
# This call will set last will only, which has to be done prior to
# connecting to the broker
await self.set_online_sensor(False, setup_only=True)
# The connection to MQTT broker is instantiated only once, if not
# connected previously. See `MqttClient.connect()` for more
# details
await self._mqtt_client.connect()
# Process parameters requested
for param in self._config.of.parameters:
Expand Down Expand Up @@ -526,14 +536,24 @@ async def iec_read_admin(self):
else:
await self.set_online_sensor(True)
finally:
# Disconnect both serial and MQTT clients ignoring possible
# exceptions - those might have not been connected yet
# Disconnect serial client ignoring possible
# exceptions - it might have not been connected yet
try:
self._client.disconnect()
await self._mqtt_client.disconnect()
except Exception: # pylint: disable=broad-except
pass

async def finalize(self):
"""
Performs finalization steps, that is - disconnecting MQTT client
currently.
"""
try:
await self.set_online_sensor(False)
await self._mqtt_client.disconnect()
except Exception: # pylint: disable=broad-except
pass

async def set_online_sensor(self, state, setup_only=False):
"""
Adds a pseudo-sensor to HASS reflecting the communication state of
Expand Down
1 change: 1 addition & 0 deletions src/energomera_hass_mqtt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ async def async_main():
if config.of.general.oneshot:
break
await asyncio.sleep(config.of.general.intercycle_delay)
await client.finalize()


def main():
Expand Down
14 changes: 14 additions & 0 deletions src/energomera_hass_mqtt/mqtt_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@ def __init__(self, *args, **kwargs):
kwargs['keepalive'] = self._keepalive
super().__init__(logger=_LOGGER, *args, **kwargs)

async def connect(self, *args, **kwargs):
"""
Connects to MQTT broker.
Multiple calls will result only in single call to `connect()` method of
parent class, to allow the method to be called within a process loop
with no risk of constantly reinitializing MQTT broker connection.
:param args: Pass-through positional arguments for parent class
:param kwargs: Pass-through keyword arguments for parent class
"""
if not self._connected.done():
await super().connect(*args, *kwargs)

def will_set(self, *args, **kwargs):
"""
Allows setting last will to the underlying MQTT client.
Expand Down
105 changes: 94 additions & 11 deletions tests/test_online_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,46 +90,86 @@ async def _generator():
return _put_in_queue, _generator()


@pytest.fixture(scope='session')
def mqtt_broker():
@pytest.fixture
def mqtt_broker(request): # pylint: disable=too-many-locals
'''
Fixture provisioning MQTT broker in form of Docker container
Fixture provisioning MQTT broker in form of Docker container.
Users/password could be provided with test mark:
```
@pytest.mark.mqtt_broker_users(
'mqtt_user1:mqtt_password1',
'mqtt_user2:mqtt_password2',
...
)
```
'''
client = docker.DockerClient()

port = 1883
container_path = '/mosquitto/config'
password_file = 'mosquitto.users'

# Attempt to get MQTT users from test mark
users = getattr(
request.node.get_closest_marker("mqtt_broker_users"),
'args', []
)

# Broker configuration file
config = [
f'listener {port}',
'allow_anonymous true',
f'allow_anonymous {"false" if users else "true"}',
'log_type debug',
'log_type error',
'log_type warning',
'log_type notice',
'log_type information',
'log_dest stdout',
'connection_messages true',
''
]
config_str = "\n".join(config)

tmpdir = tempfile.mkdtemp(dir=os.getenv('RUNNER_TEMP'))
print(f'Using {tmpdir} as temporary directory')
print(f'Using {tmpdir} as temporary directory for MQTT broker configs')

if users:
# Store provided users and password to plaintext file
config.append(f'password_file {container_path}/{password_file}')
mqtt_password_file = os.path.join(tmpdir, password_file)
with open(mqtt_password_file, 'w', encoding='ascii') as mqtt_password:
# The content should have last line ending with LF
mqtt_password.write("\n".join(users) + "\n")

mqtt_config_file = os.path.join(tmpdir, 'mosquitto.conf')
with open(mqtt_config_file, 'w', encoding='ascii') as mqtt_config:
mqtt_config.write(config_str)
# The content should have last line ending with LF
mqtt_config.write("\n".join(config) + "\n")

container = client.containers.run(
'eclipse-mosquitto', detach=True,
remove=True,
ports={f'{port}/tcp': (None, port)},
volumes={tmpdir: {'bind': '/mosquitto/config'}},
volumes={tmpdir: {'bind': container_path}},
)

if users:
# Issue command to convert the password file from cleartext to hashed
# version compatible with MQTT broker, and then signal it to re-read
# the configuration
cmd = (
f'sh -c "mosquitto_passwd -U {container_path}/{password_file}'
' && killall -SIGHUP mosquitto"'
)
exit_code, output = container.exec_run(cmd)
if exit_code != 0:
raise Exception(output)

# Allow the broker to startup and ready for the clients
time.sleep(1)

# Pass the execution to the test using the fixture
yield
yield dict(port=port)
# Clean the container up once the test completes
container.stop()
shutil.rmtree(tmpdir, ignore_errors=True)
Expand Down Expand Up @@ -174,7 +214,7 @@ async def mqtt_client_destroy(self):
mqtt_client = MqttClientTimedSubscribe(
# Timeout value should be sufficiently longer that keep-alive interval
# above, so that last will feature will be activated
hostname='127.0.0.1', subscribe_timeout=5
hostname='127.0.0.1', subscribe_timeout=5,
)

# Attempt to receive online sensor state upon unclean shutdown of the MQTT
Expand All @@ -192,6 +232,49 @@ async def mqtt_client_destroy(self):
assert online_sensor_state.pop() == json.dumps({'value': 'OFF'})


@pytest.mark.asyncio
async def test_online_sensor_normal_run(
# `pylint` mistekenly treats fixture as re-definition
# pylint: disable=redefined-outer-name, unused-argument
mqtt_broker
):
'''
Tests online sensor for properly reflecting online sensors state during
normal run
'''

# Use MQTT client that allows for receiving MQTT messages over configured
# time interval, to avoid blocking execution indefinitely (as parent MQTT
# client does)
mqtt_client = MqttClientTimedSubscribe(
# Timeout value should be sufficiently longer that keep-alive interval
# above, so that last will feature will be activated
hostname='127.0.0.1', subscribe_timeout=5,
)

# Attempt to receive online sensor state upon normal program run
async with mqtt_client as client:
async with client.unfiltered_messages() as messages:
await client.subscribe('homeassistant/binary_sensor/+/+/state', 0)

with mock_config():
with mock_serial():
await async_main()

online_sensor_state = [
x.payload.decode() async for x in messages
if 'IS_ONLINE' in x.topic
]
# There should be two messages for online sensor - first with 'ON'
# value during the program run, and another with 'OFF' value
# generated at program exit
assert len(online_sensor_state) == 2
assert online_sensor_state == [
json.dumps({'value': 'ON'}),
json.dumps({'value': 'OFF'})
]


def test_online_sensor():
'''
Tests for handling pseudo online sensor under timeout condition.
Expand Down

0 comments on commit 47c9bad

Please sign in to comment.