Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tests: harden TestNetwork and TestNmt even more #508

Merged
52 changes: 38 additions & 14 deletions test/test_network.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import unittest
from threading import Event
import threading

import canopen
import can
Expand Down Expand Up @@ -231,34 +231,58 @@ def test_network_send_periodic(self):
DATA2 = bytes([4, 5, 6])
COB_ID = 0x123
PERIOD = 0.1
TIMEOUT = PERIOD * 10
self.network.connect(interface="virtual", receive_own_messages=True)
self.addCleanup(self.network.disconnect)

acc = []
event = Event()
condition = threading.Condition()

def hook(_, data, ts):
acc.append((data, ts))
event.set()
with condition:
item = data, ts
acc.append(item)
condition.notify_all()

self.network.subscribe(COB_ID, hook)
self.addCleanup(self.network.unsubscribe, COB_ID)

task = self.network.send_periodic(COB_ID, DATA1, PERIOD)
self.addCleanup(task.stop)

event.wait(PERIOD*2)

# Update task data.
def periodicity():
# Check if periodicity is established; flakiness has been observed
# on macOS.
if len(acc) >= 2:
delta = acc[-1][1] - acc[-2][1]
return round(delta, ndigits=1) == PERIOD
return False

# Wait for frames to arrive; then check the result.
with condition:
condition.wait_for(periodicity, TIMEOUT)
self.assertTrue(all(v[0] == DATA1 for v in acc))

# Update task data, which may implicitly restart the timer.
# Wait for frames to arrive; then check the result.
task.update(DATA2)
event.clear()
event.wait(PERIOD*2)
task.stop()

with condition:
acc.clear()
condition.wait_for(periodicity, TIMEOUT)
# Find the first message with new data, and verify that all subsequent
# messages also carry the new payload.
data = [v[0] for v in acc]
self.assertEqual(data, [DATA1, DATA2])
ts = [v[1] for v in acc]
self.assertAlmostEqual(ts[1]-ts[0], PERIOD, places=1)
idx = data.index(DATA2)
self.assertTrue(all(v[0] == DATA2 for v in acc[idx:]))

# Stop the task.
task.stop()
# A message may have been in flight when we stopped the timer,
# so allow a single failure.
bus = self.network.bus
msg = bus.recv(TIMEOUT)
if msg is not None:
self.assertIsNone(bus.recv(TIMEOUT))


class TestScanner(unittest.TestCase):
Expand Down
6 changes: 5 additions & 1 deletion test/test_nmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ def test_nmt_master_node_guarding(self):
self.assertEqual(msg.dlc, 0)

self.node.nmt.stop_node_guarding()
self.assertIsNone(self.bus.recv(self.TIMEOUT))
# A message may have been in flight when we stopped the timer,
# so allow a single failure.
msg = self.bus.recv(self.TIMEOUT)
if msg is not None:
self.assertIsNone(self.bus.recv(self.TIMEOUT))


class TestNmtSlave(unittest.TestCase):
Expand Down
Loading