Skip to content

Commit

Permalink
Tests: improve coverage of Network (#515)
Browse files Browse the repository at this point in the history
  • Loading branch information
erlend-aasland authored Aug 6, 2024
1 parent a196e1e commit 7d7e2f1
Showing 1 changed file with 190 additions and 20 deletions.
210 changes: 190 additions & 20 deletions test/test_network.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import unittest
from threading import Event

Expand All @@ -9,32 +10,94 @@
class TestNetwork(unittest.TestCase):

def setUp(self):
network = canopen.Network()
with self.assertLogs():
network.add_node(2, SAMPLE_EDS)
network.add_node(3, network[2].object_dictionary)
self.network = network
self.network = canopen.Network()

def test_add_node(self):
node = self.network[2]
self.assertIsInstance(node, canopen.Node)
self.assertEqual(node.id, 2)
def test_network_add_node(self):
# Add using str.
with self.assertLogs():
node = self.network.add_node(2, SAMPLE_EDS)
self.assertEqual(self.network[2], node)
self.assertEqual(len(self.network), 2)
self.assertEqual(node.id, 2)
self.assertIsInstance(node, canopen.RemoteNode)

# Add using OD.
node = self.network.add_node(3, self.network[2].object_dictionary)
self.assertEqual(self.network[3], node)
self.assertEqual(node.id, 3)
self.assertIsInstance(node, canopen.RemoteNode)

# Add using RemoteNode.
with self.assertLogs():
node = canopen.RemoteNode(4, SAMPLE_EDS)
self.network.add_node(node)
self.assertEqual(self.network[4], node)
self.assertEqual(node.id, 4)
self.assertIsInstance(node, canopen.RemoteNode)

# Add using LocalNode.
with self.assertLogs():
node = canopen.LocalNode(5, SAMPLE_EDS)
self.network.add_node(node)
self.assertEqual(self.network[5], node)
self.assertEqual(node.id, 5)
self.assertIsInstance(node, canopen.LocalNode)

# Verify that we've got the correct number of nodes.
self.assertEqual(len(self.network), 4)

def test_notify(self):
def test_network_add_node_upload_eds(self):
# Will err because we're not connected to a real network.
with self.assertLogs(level=logging.ERROR):
self.network.add_node(2, SAMPLE_EDS, upload_eds=True)

def test_network_create_node(self):
with self.assertLogs():
self.network.create_node(2, SAMPLE_EDS)
self.network.create_node(3, SAMPLE_EDS)
node = canopen.RemoteNode(4, SAMPLE_EDS)
self.network.create_node(node)
self.assertIsInstance(self.network[2], canopen.LocalNode)
self.assertIsInstance(self.network[3], canopen.LocalNode)
self.assertIsInstance(self.network[4], canopen.RemoteNode)

def test_network_check(self):
self.network.connect(interface="virtual")

def cleanup():
# We must clear the fake exception installed below, since
# .disconnect() implicitly calls .check() during test tear down.
self.network.notifier.exception = None
self.network.disconnect()

self.addCleanup(cleanup)
self.assertIsNone(self.network.check())

class Custom(Exception):
pass

self.network.notifier.exception = Custom("fake")
with self.assertRaisesRegex(Custom, "fake"):
with self.assertLogs(level=logging.ERROR):
self.network.check()
with self.assertRaisesRegex(Custom, "fake"):
with self.assertLogs(level=logging.ERROR):
self.network.disconnect()

def test_network_notify(self):
with self.assertLogs():
self.network.add_node(2, SAMPLE_EDS)
node = self.network[2]
self.network.notify(0x82, b'\x01\x20\x02\x00\x01\x02\x03\x04', 1473418396.0)
self.assertEqual(len(node.emcy.active), 1)
self.network.notify(0x702, b'\x05', 1473418396.0)
self.assertEqual(node.nmt.state, 'OPERATIONAL')
self.assertListEqual(self.network.scanner.nodes, [2])

def test_send(self):
bus = can.interface.Bus(interface="virtual", channel=1)
def test_network_send_message(self):
bus = can.interface.Bus(interface="virtual")
self.addCleanup(bus.shutdown)

self.network.connect(interface="virtual", channel=1)
self.network.connect(interface="virtual")
self.addCleanup(self.network.disconnect)

# Send standard ID
Expand All @@ -52,16 +115,123 @@ def test_send(self):
self.assertEqual(msg.arbitration_id, 0x12345)
self.assertTrue(msg.is_extended_id)

def test_send_periodic(self):
def test_network_subscribe_unsubscribe(self):
N_HOOKS = 3
accumulators = [] * N_HOOKS

self.network.connect(interface="virtual", receive_own_messages=True)
self.addCleanup(self.network.disconnect)

for i in range(N_HOOKS):
accumulators.append([])
def hook(*args, i=i):
accumulators[i].append(args)
self.network.subscribe(i, hook)

self.network.notify(0, bytes([1, 2, 3]), 1000)
self.network.notify(1, bytes([2, 3, 4]), 1001)
self.network.notify(1, bytes([3, 4, 5]), 1002)
self.network.notify(2, bytes([4, 5, 6]), 1003)

self.assertEqual(accumulators[0], [(0, bytes([1, 2, 3]), 1000)])
self.assertEqual(accumulators[1], [
(1, bytes([2, 3, 4]), 1001),
(1, bytes([3, 4, 5]), 1002),
])
self.assertEqual(accumulators[2], [(2, bytes([4, 5, 6]), 1003)])

self.network.unsubscribe(0)
self.network.notify(0, bytes([7, 7, 7]), 1004)
# Verify that no new data was added to the accumulator.
self.assertEqual(accumulators[0], [(0, bytes([1, 2, 3]), 1000)])

def test_network_subscribe_multiple(self):
N_HOOKS = 3
self.network.connect(interface="virtual", receive_own_messages=True)
self.addCleanup(self.network.disconnect)

accumulators = []
hooks = []
for i in range(N_HOOKS):
accumulators.append([])
def hook(*args, i=i):
accumulators[i].append(args)
hooks.append(hook)
self.network.subscribe(0x20, hook)

self.network.notify(0xaa, bytes([1, 1, 1]), 2000)
self.network.notify(0x20, bytes([2, 3, 4]), 2001)
self.network.notify(0xbb, bytes([2, 2, 2]), 2002)
self.network.notify(0x20, bytes([3, 4, 5]), 2003)
self.network.notify(0xcc, bytes([3, 3, 3]), 2004)

BATCH1 = [
(0x20, bytes([2, 3, 4]), 2001),
(0x20, bytes([3, 4, 5]), 2003),
]
for n, acc in enumerate(accumulators):
with self.subTest(hook=n):
self.assertEqual(acc, BATCH1)

# Unsubscribe the second hook; dispatch a new message.
self.network.unsubscribe(0x20, hooks[1])

BATCH2 = 0x20, bytes([4, 5, 6]), 2005
self.network.notify(*BATCH2)
self.assertEqual(accumulators[0], BATCH1 + [BATCH2])
self.assertEqual(accumulators[1], BATCH1)
self.assertEqual(accumulators[2], BATCH1 + [BATCH2])

# Unsubscribe the first hook; dispatch yet another message.
self.network.unsubscribe(0x20, hooks[0])

BATCH3 = 0x20, bytes([5, 6, 7]), 2006
self.network.notify(*BATCH3)
self.assertEqual(accumulators[0], BATCH1 + [BATCH2])
self.assertEqual(accumulators[1], BATCH1)
self.assertEqual(accumulators[2], BATCH1 + [BATCH2] + [BATCH3])

# Unsubscribe the rest (only one remaining); dispatch a new message.
self.network.unsubscribe(0x20)
self.network.notify(0x20, bytes([7, 7, 7]), 2007)
self.assertEqual(accumulators[0], BATCH1 + [BATCH2])
self.assertEqual(accumulators[1], BATCH1)
self.assertEqual(accumulators[2], BATCH1 + [BATCH2] + [BATCH3])

def test_network_context_manager(self):
with self.network.connect(interface="virtual"):
pass
with self.assertRaisesRegex(RuntimeError, "Not connected"):
self.network.send_message(0, [])

def test_network_item_access(self):
with self.assertLogs():
self.network.add_node(2, SAMPLE_EDS)
self.network.add_node(3, SAMPLE_EDS)
self.assertEqual([2, 3], [node for node in self.network])

# Check __delitem__.
del self.network[2]
self.assertEqual([3], [node for node in self.network])
with self.assertRaises(KeyError):
del self.network[2]

# Check __setitem__.
old = self.network[3]
with self.assertLogs():
new = canopen.Node(3, SAMPLE_EDS)
self.network[3] = new

# Check __getitem__.
self.assertNotEqual(self.network[3], old)
self.assertEqual([3], [node for node in self.network])

def test_network_send_periodic(self):
DATA1 = bytes([1, 2, 3])
DATA2 = bytes([4, 5, 6])
COB_ID = 0x123
PERIOD = 0.1
self.network.connect(
interface="virtual",
channel=1,
receive_own_messages=True
)
self.network.connect(interface="virtual", receive_own_messages=True)
self.addCleanup(self.network.disconnect)

acc = []
Expand Down

0 comments on commit 7d7e2f1

Please sign in to comment.