Skip to content

Commit

Permalink
Merge pull request #244 from andlaus/add_comparam_convenience
Browse files Browse the repository at this point in the history
Add some convenience for communication parameters
  • Loading branch information
andlaus authored Dec 7, 2023
2 parents 262f6c8 + e645995 commit 480a5f6
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 27 deletions.
4 changes: 2 additions & 2 deletions odxtools/cli/snoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,10 @@ def run(args: argparse.Namespace) -> None:

# if no can IDs have been explicitly specified, take them from the DL
if args.rx is None and odx_diag_layer:
args.rx = str(odx_diag_layer.get_can_receive_id(protocol_name=protocol_name))
args.rx = str(odx_diag_layer.get_can_receive_id(protocol=protocol_name))

if args.tx is None and odx_diag_layer:
args.tx = str(odx_diag_layer.get_can_send_id(protocol_name=protocol_name))
args.tx = str(odx_diag_layer.get_can_send_id(protocol=protocol_name))

if args.rx is None:
print(f"Could not determine a CAN receive ID.")
Expand Down
162 changes: 137 additions & 25 deletions odxtools/diaglayer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# SPDX-License-Identifier: MIT
import re
import warnings
from dataclasses import dataclass
from functools import cached_property
Expand Down Expand Up @@ -623,12 +624,18 @@ def get_comparam(
self,
cp_short_name: str,
*,
protocol_name: Optional[str] = None,
protocol: Optional[Union[str, "DiagLayer"]] = None,
) -> Optional[ComparamInstance]:
"""Find a specific communication parameter according to some criteria.
Setting a given parameter to `None` means "don't care"."""

protocol_name: Optional[str]
if isinstance(protocol, DiagLayer):
protocol_name = protocol.short_name
else:
protocol_name = protocol

# determine the set of applicable communication parameters
cps = [cp for cp in self.comparams if cp.short_name == cp_short_name]
if protocol_name is not None:
Expand All @@ -646,9 +653,104 @@ def get_comparam(

return cps[0]

def get_can_receive_id(self, protocol_name: Optional[str] = None) -> Optional[int]:
def get_max_can_payload_size(self,
protocol: Optional[Union[str,
"DiagLayer"]] = None) -> Optional[int]:
"""Return the maximum size of a CAN frame payload that can be
transmitted in bytes.
For classic CAN busses, this is basically always 8. CAN-FD can
send up to 64 bytes per frame.
"""
com_param = self.get_comparam("CP_CANFDTxMaxDataLength", protocol=protocol)
if com_param is None:
rx_id = self.get_can_receive_id(protocol=protocol)
if rx_id is not None:
# bus is CAN. We also use 8 bytes for CAN-FD if this
# parameter was not specified.
return 8

# bus is not CAN
return None

val = com_param.value
if not isinstance(val, str):
return None

m = re.search("TX_DL *= *([0-9]*)", val)
if m:
return int(m.group(1))

# unexpected format of parameter value
return 8

def uses_can(self, protocol: Optional[Union[str, "DiagLayer"]] = None) -> bool:
"""
Check if CAN ought to be used as the link layer protocol.
"""
return self.get_can_receive_id(protocol=protocol) is not None

def uses_can_fd(self, protocol: Optional[Union[str, "DiagLayer"]] = None) -> bool:
"""Check if CAN-FD ought to be used.
If the ECU is not using CAN-FD for the specified protocol, `False`
is returned.
If this method returns `True`, `.uses_can() == True` is implied
for the protocol.
"""

if not self.uses_can(protocol):
return False

com_param = self.get_comparam("CP_CANFDTxMaxDataLength", protocol=protocol)
if com_param is None:
return False

return "CANFD" in com_param.value

def get_can_baudrate(self, protocol: Optional[Union[str, "DiagLayer"]] = None) -> Optional[int]:
"""Baudrate of the CAN bus which is used by the ECU [bits/s]
If the ECU is not using CAN for the specified protocol, None
is returned.
"""
com_param = self.get_comparam("CP_Baudrate", protocol=protocol)
if com_param is None:
return None

val = com_param.value
if not isinstance(val, str):
return None

return int(val)

def get_can_fd_baudrate(self,
protocol: Optional[Union[str, "DiagLayer"]] = None) -> Optional[int]:
"""Data baudrate of the CAN bus which is used by the ECU [bits/s]
If the ECU is not using CAN-FD for the specified protocol,
None is returned.
"""
if not self.uses_can_fd(protocol=protocol):
return None

com_param = self.get_comparam("CP_CANFDBaudrate", protocol=protocol)
if com_param is None:
return None

val = com_param.value
if not isinstance(val, str):
return None

return int(val)

def get_can_receive_id(self,
protocol: Optional[Union[str, "DiagLayer"]] = None) -> Optional[int]:
"""CAN ID to which the ECU listens for diagnostic messages"""
com_param = self.get_comparam("CP_UniqueRespIdTable", protocol_name=protocol_name)
com_param = self.get_comparam("CP_UniqueRespIdTable", protocol=protocol)
if com_param is None:
return None

Expand All @@ -669,15 +771,15 @@ def get_can_receive_id(self, protocol_name: Optional[str] = None) -> Optional[in
def get_receive_id(self) -> Optional[int]:
return self.get_can_receive_id()

def get_can_send_id(self, protocol_name: Optional[str] = None) -> Optional[int]:
def get_can_send_id(self, protocol: Optional[Union[str, "DiagLayer"]] = None) -> Optional[int]:
"""CAN ID to which the ECU sends replies to diagnostic messages"""

# this hopefully resolves to the 'CP_UniqueRespIdTable'
# parameter from the ISO_15765_2 comparam subset. (There is a
# parameter with the same name in the ISO_13400_2_DIS_2015
# subset for DoIP. If the wrong one is retrieved, try
# specifying the protocol_name.)
com_param = self.get_comparam("CP_UniqueRespIdTable", protocol_name=protocol_name)
# specifying the protocol.)
com_param = self.get_comparam("CP_UniqueRespIdTable", protocol=protocol)
if com_param is None:
return None

Expand All @@ -698,9 +800,10 @@ def get_can_send_id(self, protocol_name: Optional[str] = None) -> Optional[int]:
def get_send_id(self) -> Optional[int]:
return self.get_can_send_id()

def get_can_func_req_id(self, protocol_name: Optional[str] = None) -> Optional[int]:
def get_can_func_req_id(self,
protocol: Optional[Union[str, "DiagLayer"]] = None) -> Optional[int]:
"""CAN Functional Request Id."""
com_param = self.get_comparam("CP_CanFuncReqId", protocol_name=protocol_name)
com_param = self.get_comparam("CP_CanFuncReqId", protocol=protocol)
if com_param is None:
return None

Expand All @@ -711,10 +814,12 @@ def get_can_func_req_id(self, protocol_name: Optional[str] = None) -> Optional[i

return int(result)

def get_doip_logical_ecu_address(self, protocol_name: Optional[str] = None) -> Optional[int]:
def get_doip_logical_ecu_address(self,
protocol: Optional[Union[str, "DiagLayer"]] = None
) -> Optional[int]:
"""Return the address of the ECU when using functional addressing.
The parameter protocol_name is used to distinguish between
The parameter protocol is used to distinguish between
different interfaces, e.g., offboard and onboard DoIP
Ethernet.
"""
Expand All @@ -723,8 +828,8 @@ def get_doip_logical_ecu_address(self, protocol_name: Optional[str] = None) -> O
# parameter from the ISO_13400_2_DIS_2015 comparam
# subset. (There is a parameter with the same name in the
# ISO_15765_2 subset for CAN. If the wrong one is retrieved,
# try specifying the protocol_name.)
com_param = self.get_comparam("CP_UniqueRespIdTable", protocol_name=protocol_name)
# try specifying the protocol.)
com_param = self.get_comparam("CP_UniqueRespIdTable", protocol=protocol)

if com_param is None:
return None
Expand All @@ -742,12 +847,13 @@ def get_doip_logical_ecu_address(self, protocol_name: Optional[str] = None) -> O
return int(ecu_addr)

def get_doip_logical_gateway_address(self,
protocol_name: Optional[str] = None) -> Optional[int]:
protocol: Optional[Union[str, "DiagLayer"]] = None
) -> Optional[int]:
"""The logical gateway address for the diagnosis over IP transport protocol"""

# retrieve CP_DoIPLogicalGatewayAddress from the
# ISO_13400_2_DIS_2015 subset. hopefully.
com_param = self.get_comparam("CP_DoIPLogicalGatewayAddress", protocol_name=protocol_name)
com_param = self.get_comparam("CP_DoIPLogicalGatewayAddress", protocol=protocol)
if com_param is None:
return None

Expand All @@ -758,12 +864,14 @@ def get_doip_logical_gateway_address(self,

return int(result)

def get_doip_logical_tester_address(self, protocol_name: Optional[str] = None) -> Optional[int]:
def get_doip_logical_tester_address(self,
protocol: Optional[Union[str, "DiagLayer"]] = None
) -> Optional[int]:
"""DoIp logical gateway address"""

# retrieve CP_DoIPLogicalTesterAddress from the
# ISO_13400_2_DIS_2015 subset. hopefully.
com_param = self.get_comparam("CP_DoIPLogicalTesterAddress", protocol_name=protocol_name)
com_param = self.get_comparam("CP_DoIPLogicalTesterAddress", protocol=protocol)
if com_param is None:
return None

Expand All @@ -775,14 +883,15 @@ def get_doip_logical_tester_address(self, protocol_name: Optional[str] = None) -
return int(result)

def get_doip_logical_functional_address(self,
protocol_name: Optional[str] = None) -> Optional[int]:
protocol: Optional[Union[str, "DiagLayer"]] = None
) -> Optional[int]:
"""The logical functional DoIP address of the ECU."""

# retrieve CP_DoIPLogicalFunctionalAddress from the
# ISO_13400_2_DIS_2015 subset. hopefully.
com_param = self.get_comparam(
"CP_DoIPLogicalFunctionalAddress",
protocol_name=protocol_name,
protocol=protocol,
)
if com_param is None:
return None
Expand All @@ -795,13 +904,13 @@ def get_doip_logical_functional_address(self,
return int(result)

def get_doip_routing_activation_timeout(self,
protocol_name: Optional[str] = None) -> Optional[float]:
protocol: Optional[Union[str, "DiagLayer"]] = None
) -> Optional[float]:
"""The timout for the DoIP routing activation request in seconds"""

# retrieve CP_DoIPRoutingActivationTimeout from the
# ISO_13400_2_DIS_2015 subset. hopefully.
com_param = self.get_comparam(
"CP_DoIPRoutingActivationTimeout", protocol_name=protocol_name)
com_param = self.get_comparam("CP_DoIPRoutingActivationTimeout", protocol=protocol)
if com_param is None:
return None

Expand All @@ -813,7 +922,8 @@ def get_doip_routing_activation_timeout(self,
return float(result) / 1e6

def get_doip_routing_activation_type(self,
protocol_name: Optional[str] = None) -> Optional[int]:
protocol: Optional[Union[str, "DiagLayer"]] = None
) -> Optional[int]:
"""The DoIP routing activation type
The number returned has the following meaning:
Expand All @@ -826,7 +936,7 @@ def get_doip_routing_activation_type(self,

# retrieve CP_DoIPRoutingActivationType from the
# ISO_13400_2_DIS_2015 subset. hopefully.
com_param = self.get_comparam("CP_DoIPRoutingActivationType", protocol_name=protocol_name)
com_param = self.get_comparam("CP_DoIPRoutingActivationType", protocol=protocol)
if com_param is None:
return None

Expand All @@ -837,7 +947,9 @@ def get_doip_routing_activation_type(self,

return int(result)

def get_tester_present_time(self, protocol_name: Optional[str] = None) -> Optional[float]:
def get_tester_present_time(self,
protocol: Optional[Union[str,
"DiagLayer"]] = None) -> Optional[float]:
"""Timeout on inactivity in seconds.
This is defined by the communication parameter "CP_TesterPresentTime".
Expand All @@ -849,7 +961,7 @@ def get_tester_present_time(self, protocol_name: Optional[str] = None) -> Option

# retrieve CP_TesterPresentTime from either the
# ISO_13400_2_DIS_2015 or the ISO_15765_2 subset.
com_param = self.get_comparam("CP_TesterPresentTime", protocol_name=protocol_name)
com_param = self.get_comparam("CP_TesterPresentTime", protocol=protocol)
if com_param is None:
return None

Expand Down

0 comments on commit 480a5f6

Please sign in to comment.