Skip to content

Commit

Permalink
Avoid None dummy value in node.RemoteNode.network.
Browse files Browse the repository at this point in the history
Use a _DummyNetwork object instead.  Fix up check for "network is
None" by providing a convenience has_network() function in the base
class.
  • Loading branch information
acolomb committed Aug 11, 2024
1 parent a318e0b commit 2e01485
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
4 changes: 4 additions & 0 deletions canopen/node/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ def __init__(
self.object_dictionary = object_dictionary

self.id = node_id or self.object_dictionary.node_id

def has_network(self) -> bool:
"""Check whether the node has been associated to a network."""
return not isinstance(self.network, canopen.network._DummyNetwork)
21 changes: 12 additions & 9 deletions canopen/node/remote.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations

import logging
from typing import Union, TextIO

import canopen.network
from canopen.sdo import SdoClient, SdoCommunicationError, SdoAbortedError
from canopen.nmt import NmtMaster
from canopen.emcy import EmcyConsumer
Expand Down Expand Up @@ -46,7 +49,7 @@ def __init__(
if load_od:
self.load_configuration()

def associate_network(self, network):
def associate_network(self, network: canopen.network.Network):
self.network = network
self.sdo.network = network
self.pdo.network = network
Expand All @@ -59,18 +62,18 @@ def associate_network(self, network):
network.subscribe(0x80 + self.id, self.emcy.on_emcy)
network.subscribe(0, self.nmt.on_command)

def remove_network(self):
def remove_network(self) -> None:
for sdo in self.sdo_channels:
self.network.unsubscribe(sdo.tx_cobid, sdo.on_response)
self.network.unsubscribe(0x700 + self.id, self.nmt.on_heartbeat)
self.network.unsubscribe(0x80 + self.id, self.emcy.on_emcy)
self.network.unsubscribe(0, self.nmt.on_command)
self.network = None
self.sdo.network = None
self.pdo.network = None
self.tpdo.network = None
self.rpdo.network = None
self.nmt.network = None
self.network = canopen.network._DummyNetwork()
self.sdo.network = self.network
self.pdo.network = self.network
self.tpdo.network = self.network
self.rpdo.network = self.network
self.nmt.network = self.network

def add_sdo(self, rx_cobid, tx_cobid):
"""Add an additional SDO channel.
Expand All @@ -87,7 +90,7 @@ def add_sdo(self, rx_cobid, tx_cobid):
"""
client = SdoClient(rx_cobid, tx_cobid, self.object_dictionary)
self.sdo_channels.append(client)
if self.network is not None:
if self.has_network():
self.network.subscribe(client.tx_cobid, client.on_response)
return client

Expand Down

0 comments on commit 2e01485

Please sign in to comment.