Skip to content

Commit

Permalink
Merge pull request #10 from hostcc/bugfix/mqtt-reconnect
Browse files Browse the repository at this point in the history
Fixed MQTT client reconnect upon a network error
  • Loading branch information
hostcc authored Dec 29, 2022
2 parents d01dd4c + 470f050 commit d1a0912
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 140 deletions.
18 changes: 9 additions & 9 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ jobs:
fail-fast: false
matrix:
include:
- os: ubuntu-latest
python: 3.6
toxenv: py
- os: ubuntu-latest
python: 3.7
toxenv: py
Expand All @@ -29,18 +26,21 @@ jobs:
- os: ubuntu-latest
python: 3.9
toxenv: py
- os: ubuntu-latest
python: '3.10'
toxenv: py
runs-on: ${{ matrix.os }}
outputs:
version: ${{ steps.package-version.outputs.VALUE }}
steps:
- name: Checkout the code
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
# Disable shallow clone for Sonar scanner, as it needs access to the
# history
fetch-depth: 0
- name: Set Python up
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python }}
- name: Install testing tools
Expand All @@ -54,7 +54,7 @@ jobs:
id: package-version
run: |
package_version=`cat version.txt`
echo "::set-output name=VALUE::$package_version"
echo "VALUE=$package_version" >> $GITHUB_OUTPUT
- name: SonarCloud scanning
uses: sonarsource/sonarcloud-github-action@master
env:
Expand All @@ -77,11 +77,11 @@ jobs:
needs: [tests]
steps:
- name: Checkout the code
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
fetch-depth: 0 # `setuptools_scm` needs tags
- name: Set Python up
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: 3.9
- name: Install the PEP517 package builder
Expand Down Expand Up @@ -124,7 +124,7 @@ jobs:
|| github.event.release.target_commitish == 'master')
steps:
- name: Checkout the code
uses: actions/checkout@v2
uses: actions/checkout@v3

- name: Set up QEMU for more platforms supported by Buildx
uses: docker/setup-qemu-action@v2
Expand Down
12 changes: 6 additions & 6 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@ classifiers =
Topic :: System :: Hardware
License :: OSI Approved :: MIT License
Programming Language :: Python :: 3
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Programming Language :: Python :: 3.10
Programming Language :: Python :: 3 :: Only

[options]
package_dir =
= src
packages = find:
python_requires = >=3.6
python_requires = >=3.7
install_requires =
iec62056-21 @ git+https://github.com/hostcc/iec62056-21.git@feature/transport-improvements
asyncio-mqtt
pyyaml
schema
addict
addict==2.4.0
asyncio-mqtt==0.16.1
pyyaml==6.0
schema==0.7.5

[options.packages.find]
where = src
Expand Down
6 changes: 1 addition & 5 deletions src/energomera_hass_mqtt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,7 @@ def main():
"""
Main entry point for the CLI.
"""
try:
asyncio.run(async_main())
except AttributeError:
# Python 3.6 has no `asyncio.run()`, emulate it
asyncio.get_event_loop().run_until_complete(async_main())
asyncio.run(async_main())


if __name__ == '__main__':
Expand Down
13 changes: 11 additions & 2 deletions src/energomera_hass_mqtt/mqtt_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,17 @@ async def connect(self, *args, **kwargs):
:param kwargs: Pass-through keyword arguments for parent class
"""
if self._connected.done() and not self._disconnected.done():
_LOGGER.info(
# Using combination of `self._connected` and `self._disconnected` (both
# inherited from `asyncio_mqtt.client`) to detect of MQTT client needs
# a reconnection upon a network error isn't reliable - the former isn't
# finished even after a disconnect, while the latter stays with
# exception after a successfull reconnect. Neither
# `self._client.is_connected` (from Paho client) is - is returns True
# if socket is disconnected due to network error. Only testing for
# `self._client.socket()` (from Paho client as well) fits the purpose -
# None indicates the client needs `connect`
if self._client.socket():
_LOGGER.debug(
'MQTT client is already connected, skipping subsequent attempt'
)
return
Expand Down
167 changes: 59 additions & 108 deletions tests/test_online_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,61 +35,6 @@
from energomera_hass_mqtt.main import async_main, main


# pylint: disable=too-few-public-methods
class MqttClientTimedSubscribe(MqttClient):
'''
MQTT client with timed subscribe functionality - that is, allowing to
received messages from subscription within configured time frame. The
existing MQTT client, in contrast, awaits for the messages indefinitely
:param int subscribe_timeout: Time interval to wait for messages coming
from MQTT subscription
:param list *args: Pass through positional arguments
:param dict *kwargs: Pass trhough keyword arguments
'''
def __init__(self, subscribe_timeout, *args, **kwargs):
self._subscribe_timeout = subscribe_timeout
super().__init__(*args, **kwargs)

def _cb_and_generator(self, *_args, **_kwargs):
'''
Overrides the method providing callback for messages received from
subscription and sending them to the caller from async generator.
'''
messages = asyncio.Queue()

def _put_in_queue(_client, _userdata, msg):
'''
MQTT callback that simply puts received messages to the queue
'''
messages.put_nowait(msg)

async def _generator():
'''
Asynchronous generator that sends messages from the queue received
within configured time interval
'''
try:
asyncio_create_task = asyncio.create_task
except AttributeError:
# Python 3.6 has no `asyncio.create_task`
asyncio_create_task = asyncio.ensure_future

while True:
task = asyncio_create_task(messages.get())
done, _ = await asyncio.wait(
[task], timeout=self._subscribe_timeout
)
# Stop processing messages if configured internal has elapsed
if task not in done:
task.cancel()
break
# Provide received message to the caller
yield task.result()

return _put_in_queue, _generator()


@pytest.fixture
def mqtt_broker(request): # pylint: disable=too-many-locals
'''
Expand All @@ -105,7 +50,7 @@ def mqtt_broker(request): # pylint: disable=too-many-locals
)
```
'''
client = docker.DockerClient()
client = docker.DockerClient.from_env()

port = 1883
container_path = '/mosquitto/config'
Expand All @@ -130,7 +75,9 @@ def mqtt_broker(request): # pylint: disable=too-many-locals
'connection_messages true',
]

tmpdir = tempfile.mkdtemp(dir=os.getenv('RUNNER_TEMP'))
tmpdir = tempfile.mkdtemp(
dir=os.getenv('RUNNER_TEMP') or os.getenv('TOX_WORK_DIR')
)
print(f'Using {tmpdir} as temporary directory for MQTT broker configs')

if users:
Expand Down Expand Up @@ -175,6 +122,44 @@ def mqtt_broker(request): # pylint: disable=too-many-locals
shutil.rmtree(tmpdir, ignore_errors=True)


async def read_online_sensor_states(code=None, timeout=5):
'''
Retrieves online sensor states from MQTT broker during alotted time frame -
traditional MQTT client subscribe() awaits for the messages indefinitely
'''
online_sensor_states = []

async def listen(sensor_states):
'''
Attempts to receive online sensor states
'''
async with MqttClient(hostname='127.0.0.1') as client:
async with client.messages() as messages:
await client.subscribe(
'homeassistant/binary_sensor/+/+/state', 0
)
async for msg in messages:
if 'IS_ONLINE' in msg.topic.value:
sensor_states.append(msg.payload.decode())

task = asyncio.create_task(listen(online_sensor_states))
# Execute optional code (e.g. normal program run) before awaiting for
# sensor states
if code:
await code()

# Wait for configured time to allow sensor states to be retrieved
try:
await asyncio.wait_for(
task, timeout=timeout
)
# pylint:disable=broad-except
except Exception:
pass

return online_sensor_states


@pytest.mark.asyncio
async def test_online_sensor_last_will(
# `pylint` mistekenly treats fixture as re-definition
Expand Down Expand Up @@ -208,28 +193,9 @@ async def mqtt_client_destroy(self):
with patch.object(MqttClient, '_keepalive', 1):
await async_main()

# Use MQTT client that allows for receiving MQTT messages over configured
# time interval, to avoid blocking execution indefinitely (as parent MQTT
# client does)
mqtt_client = MqttClientTimedSubscribe(
# Timeout value should be sufficiently longer that keep-alive interval
# above, so that last will feature will be activated
hostname='127.0.0.1', subscribe_timeout=5,
)

# Attempt to receive online sensor state upon unclean shutdown of the MQTT
# client
async with mqtt_client as client:
async with client.unfiltered_messages() as messages:
await client.subscribe('homeassistant/binary_sensor/+/+/state', 0)
online_sensor_state = [
x.payload.decode() async for x in messages
if 'IS_ONLINE' in x.topic
]
# Verify only single message received
assert len(online_sensor_state) == 1
# Verify the sensor state should be OFF during unclean shutdown
assert online_sensor_state.pop() == json.dumps({'value': 'OFF'})
online_sensor_states = await read_online_sensor_states()
# Verify the last sensor state should be OFF during unclean shutdown
assert online_sensor_states.pop() == json.dumps({'value': 'OFF'})


@pytest.mark.asyncio
Expand All @@ -243,36 +209,21 @@ async def test_online_sensor_normal_run(
normal run
'''

# Use MQTT client that allows for receiving MQTT messages over configured
# time interval, to avoid blocking execution indefinitely (as parent MQTT
# client does)
mqtt_client = MqttClientTimedSubscribe(
# Timeout value should be sufficiently longer that keep-alive interval
# above, so that last will feature will be activated
hostname='127.0.0.1', subscribe_timeout=5,
)

# Attempt to receive online sensor state upon normal program run
async with mqtt_client as client:
async with client.unfiltered_messages() as messages:
await client.subscribe('homeassistant/binary_sensor/+/+/state', 0)

with mock_config():
with mock_serial():
await async_main()

online_sensor_state = [
x.payload.decode() async for x in messages
if 'IS_ONLINE' in x.topic
]
# There should be two messages for online sensor - first with 'ON'
# value during the program run, and another with 'OFF' value
# generated at program exit
assert len(online_sensor_state) == 2
assert online_sensor_state == [
json.dumps({'value': 'ON'}),
json.dumps({'value': 'OFF'})
]
async def normal_run():
with mock_config():
with mock_serial():
await async_main()

online_sensor_states = await read_online_sensor_states(normal_run)
# There should be two messages for online sensor - first with 'ON'
# value during the program run, and another with 'OFF' value
# generated at program exit
assert len(online_sensor_states) == 2
assert online_sensor_states == [
json.dumps({'value': 'ON'}),
json.dumps({'value': 'OFF'})
]


def test_online_sensor():
Expand Down
22 changes: 12 additions & 10 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tox]
envlist = py{36,37,38,39}
envlist = py{37,38,39,310}

# Define the minimal tox version required to run;
# if the host tox is less than this the tool with create an environment and
Expand All @@ -14,15 +14,16 @@ isolated_build = true

[testenv]
deps =
check-manifest >= 0.42
flake8
pylint
pytest
pytest-cov
pytest-asyncio
mock;python_version<"3.8"
freezegun
docker
check-manifest==0.49
flake8==5.0.4;python_version=="3.7"
flake8==6.0.0;python_version>"3.7"
pylint==2.15.9
pytest==7.2.0
pytest-asyncio==0.20.3
pytest-cov==4.0.0
mock==4.0.3;python_version<"3.8"
freezegun==1.2.2
docker==6.0.1
setenv =
# Ensure the module under test will be found under `src/` directory, in
# case of any test command below will attempt importing it. In particular,
Expand All @@ -34,6 +35,7 @@ setenv =
passenv =
RUNNER_*
GITHUB_*
DOCKER_*
allowlist_externals =
cat
commands =
Expand Down

0 comments on commit d1a0912

Please sign in to comment.