From 1a00ff4e71bb14735aa6741120411ac8e3f4e017 Mon Sep 17 00:00:00 2001 From: Marek Sebera Date: Sun, 1 Dec 2024 02:28:36 +0100 Subject: [PATCH] crc7, reverse channel (RC PDU), parsing from hytera IPSC and necessities, #8 --- okdmr/dmrlib/etsi/crc/crc7.py | 42 ++++++++++ okdmr/dmrlib/etsi/layer2/burst.py | 11 +++ .../dmrlib/etsi/layer2/pdu/reverse_channel.py | 79 +++++++++++++++++++ .../elements/reverse_channel_command.py | 35 ++++++++ okdmr/dmrlib/tools/hytera_tool.py | 34 ++++++++ .../etsi/layer2/pdu/test_reverse_channel.py | 19 +++++ pyproject.toml | 1 + 7 files changed, 221 insertions(+) create mode 100644 okdmr/dmrlib/etsi/crc/crc7.py create mode 100644 okdmr/dmrlib/etsi/layer2/pdu/reverse_channel.py create mode 100644 okdmr/dmrlib/etsi/layer3/elements/reverse_channel_command.py create mode 100644 okdmr/tests/dmrlib/etsi/layer2/pdu/test_reverse_channel.py diff --git a/okdmr/dmrlib/etsi/crc/crc7.py b/okdmr/dmrlib/etsi/crc/crc7.py new file mode 100644 index 0000000..e0923fc --- /dev/null +++ b/okdmr/dmrlib/etsi/crc/crc7.py @@ -0,0 +1,42 @@ +from bitarray import bitarray +from bitarray.util import ba2int + +from okdmr.dmrlib.etsi.crc.crc import BitCrcCalculator, Crc7 +from okdmr.dmrlib.etsi.layer2.elements.crc_masks import CrcMasks + + +class CRC7: + """ + B.3.13 7-bit CRC calculation - ETSI TS 102 361-1 V2.5.1 (2017-10) + """ + + CALC: BitCrcCalculator = BitCrcCalculator( + table_based=True, configuration=Crc7.ETSI_DMR + ) + + @staticmethod + def check( + data: bitarray, crc7: int, mask: CrcMasks = CrcMasks.ReverseChannel + ) -> bool: + """ + Will check that provided crc7 param matches the internal calculation + :param data: bitarray data + :param mask: usually only CrcMasks.ReverseChannel + :param crc7 expected result + :return: verification result + """ + assert ( + 0x00 <= crc7 <= 0x7F + ), f"crc7 is expected in range (exclusive) 0-{0x7F}, got {crc7}" + + return CRC7.calculate(data, mask) == crc7 + + @staticmethod + def calculate(data: bitarray, mask: CrcMasks = CrcMasks.ReverseChannel) -> int: + """ + Will perform bytes-swap of payload and returns crc7 as int + :param data: bytes object of data to be checked + :param mask: usually only CrcMasks.ReverseChannel + :return: int crc7 + """ + return ba2int(CRC7.CALC.calculate_checksum(data)) ^ mask.value diff --git a/okdmr/dmrlib/etsi/layer2/burst.py b/okdmr/dmrlib/etsi/layer2/burst.py index 35aa68e..735bb5b 100644 --- a/okdmr/dmrlib/etsi/layer2/burst.py +++ b/okdmr/dmrlib/etsi/layer2/burst.py @@ -1,6 +1,9 @@ from typing import Optional, Literal from bitarray import bitarray +from okdmr.dmrlib.etsi.layer2.elements.preemption_power_indicator import ( + PreemptionPowerIndicator, +) from okdmr.kaitai.homebrew.mmdvm2020 import Mmdvm2020 from okdmr.kaitai.hytera.ip_site_connect_protocol import IpSiteConnectProtocol @@ -187,6 +190,14 @@ def colour_code(self) -> int: def __repr__(self) -> str: status: str = f"[{self.sync_or_embedded_signalling.name}] " + if ( + self.has_emb + and self.sync_or_embedded_signalling == SyncPatterns.EmbeddedSignalling + and self.emb.preemption_and_power_control_indicator + == PreemptionPowerIndicator.CarriesReverseChannelInformation + ): + status += f"[RC Info {self.embedded_signalling_bits}] " + if self.is_vocoder: status += f" [{self.voice_burst}]" diff --git a/okdmr/dmrlib/etsi/layer2/pdu/reverse_channel.py b/okdmr/dmrlib/etsi/layer2/pdu/reverse_channel.py new file mode 100644 index 0000000..29069eb --- /dev/null +++ b/okdmr/dmrlib/etsi/layer2/pdu/reverse_channel.py @@ -0,0 +1,79 @@ +from typing import Union, Literal, Optional + +from bitarray import bitarray +from bitarray.util import ba2int, int2ba +from okdmr.dmrlib.etsi.crc.crc7 import CRC7 +from okdmr.dmrlib.etsi.fec.vbptc_32_11 import VBPTC3211 +from okdmr.dmrlib.etsi.layer2.elements.crc_masks import CrcMasks + +from okdmr.dmrlib.utils.bits_bytes import bits_to_bytes, bytes_to_bits +from okdmr.dmrlib.utils.bits_interface import BitsInterface +from okdmr.dmrlib.utils.bytes_interface import BytesInterface +from okdmr.dmrlib.etsi.layer3.elements.reverse_channel_command import ( + ReverseChannelCommand, +) + + +class ReverseChannel(BitsInterface, BytesInterface): + def __init__( + self, + rc_command: ReverseChannelCommand = ReverseChannelCommand.SetPowerToHighest, + crc7: Union[int, bytes] = 0, + ): + self.rc_command: ReverseChannelCommand = rc_command + self.crc: int = self.calculate_crc() + self.crc_ok: bool = self.crc == ( + crc7 if isinstance(crc7, int) else int.from_bytes(crc7, byteorder="big") + ) + + def calculate_crc(self) -> int: + return CRC7.calculate( + data=self.rc_command.as_bits(), mask=CrcMasks.ReverseChannel + ) + + def __repr__(self) -> str: + return f"[ReverseChannel {self.rc_command}]" + ( + "" if self.crc_ok else " [CRC7 INVALID]" + ) + + @staticmethod + def deinterleave(bits: bitarray) -> bitarray: + return VBPTC3211.deinterleave_data_bits(bits) + + @staticmethod + def from_bits(bits: bitarray) -> "ReverseChannel": + # from_bits => from on-air bits, means from interleaved form + assert len(bits) in ( + 32, + 11, + 4, + ), f"not 32 (full on-air pdu), 11 (rc command + crc7) or 4 (rc command), got {len(bits)}" + deinterleaved: bitarray = ( + ReverseChannel.deinterleave(bits) + if len(bits) == 32 + else (bits if len(bits) == 11 else (bits + bitarray("0000000"))) + ) + return ReverseChannel( + rc_command=ReverseChannelCommand.from_bits(deinterleaved[0:4]), + crc7=ba2int(deinterleaved[4:11]), + ) + + def encode(self) -> bitarray: + """ + Returns: 32 bits of interleaved data+crc7 + """ + return VBPTC3211.encode( + (self.rc_command.as_bits() + int2ba(self.calculate_crc(), 7)), False + ) + + def as_bits(self) -> bitarray: + return self.encode() + + @staticmethod + def from_bytes( + data: bytes, endian: Literal["big", "little"] = "big" + ) -> Optional["BytesInterface"]: + return ReverseChannel.from_bits(bytes_to_bits(data)) + + def as_bytes(self, endian: Literal["big", "little"] = "big") -> bytes: + return bits_to_bytes(self.encode()) diff --git a/okdmr/dmrlib/etsi/layer3/elements/reverse_channel_command.py b/okdmr/dmrlib/etsi/layer3/elements/reverse_channel_command.py new file mode 100644 index 0000000..05b72cc --- /dev/null +++ b/okdmr/dmrlib/etsi/layer3/elements/reverse_channel_command.py @@ -0,0 +1,35 @@ +import enum + +from bitarray import bitarray +from bitarray.util import ba2int, int2ba + +from okdmr.dmrlib.utils.bits_interface import BitsInterface + + +@enum.unique +class ReverseChannelCommand(BitsInterface, enum.Enum): + """ + ETSI TS 102 361-4 V1.10.1 (2019-08) - 6.4.14.1 Reverse Channel + Table 6.31: MS Reverse Channel information elements for Power Control and Transmitter Control + """ + + IncreasePowerOneStep = 0 + DecreasePowerOneStep = 1 + SetPowerToHighest = 2 + SetPowerToLowest = 3 + CeaseTransmissionCommand = 4 + CeaseTransmissionRequest = 5 + # range 0110 - 1111 is reserved + Reserved = 0b1111 + + @staticmethod + def from_bits(bits: bitarray) -> "ReverseChannelCommand": + return ReverseChannelCommand(ba2int(bits[0:4])) + + def as_bits(self) -> bitarray: + return int2ba(self.value, 4) + + @classmethod + def _missing_(cls, value: int) -> "ReverseChannelCommand": + print(f"Cannot find RC Command value {value}") + return ReverseChannelCommand.Reserved diff --git a/okdmr/dmrlib/tools/hytera_tool.py b/okdmr/dmrlib/tools/hytera_tool.py index 4ac2023..7cf369f 100644 --- a/okdmr/dmrlib/tools/hytera_tool.py +++ b/okdmr/dmrlib/tools/hytera_tool.py @@ -7,6 +7,13 @@ from okdmr.dmrlib.hytera.pdu.text_message_protocol import TextMessageProtocol from okdmr.dmrlib.utils.protocol_tool import ProtocolTool +import sys +from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter +from typing import Type, Optional, List + +from okdmr.dmrlib.etsi.layer2.burst import Burst +from okdmr.kaitai.hytera.ip_site_connect_protocol import IpSiteConnectProtocol + class HyteraTool(ProtocolTool): @staticmethod @@ -44,3 +51,30 @@ def tmp() -> None: HyteraTool._impl( protocol="TMP - Text Message Protocol", impl=TextMessageProtocol ) + + @staticmethod + def ipsc(arguments: Optional[List[str]] = None) -> None: + parser: ArgumentParser = ArgumentParser( + description="IPSC - IP Site Connect", + formatter_class=ArgumentDefaultsHelpFormatter, + ) + parser.add_argument( + "hex", + type=str, + nargs="+", + help=f"Hex encoded messages of Hytera IPSC protocol", + ) + args = ( + parser.parse_args(sys.argv[1:]) + if (not arguments or not len(arguments)) + else arguments + ) + for hex_msg in args.hex: + print(hex_msg) + try: + pdu = Burst.from_hytera_ipsc( + IpSiteConnectProtocol.from_bytes(bytes.fromhex(hex_msg)) + ) + print(repr(pdu)) + except Exception as e: + print(e, file=sys.stderr) diff --git a/okdmr/tests/dmrlib/etsi/layer2/pdu/test_reverse_channel.py b/okdmr/tests/dmrlib/etsi/layer2/pdu/test_reverse_channel.py new file mode 100644 index 0000000..6b08450 --- /dev/null +++ b/okdmr/tests/dmrlib/etsi/layer2/pdu/test_reverse_channel.py @@ -0,0 +1,19 @@ +from okdmr.dmrlib.etsi.layer2.pdu.reverse_channel import ReverseChannel +from bitarray import bitarray + +from okdmr.dmrlib.utils.bits_bytes import bits_to_bytes, byteswap_bytes, bytes_to_bits + + +def test_reverse_channel(): + rcs: List[Tuple[str,]] = [ + ("01011011011000001010111000010101",), + ("11111000000001100000011001000101",), + ("10110101101110001110111110000100",), + ("11101111101110001010111010010100",), + # ("",), + ] + for (rc,) in rcs: + rc_bits: bitarray = bitarray(rc) + rc_pdu: ReverseChannel = ReverseChannel.from_bits(rc_bits) + print(repr(rc_pdu)) + assert rc_pdu.as_bits() == rc_bits diff --git a/pyproject.toml b/pyproject.toml index 1d8daf0..e2babfb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,6 +64,7 @@ dmrlib-hrnp-connect = "okdmr.dmrlib.tools.hrnp_client:HRNPClient.run" dmrlib-hytera-hdap = "okdmr.dmrlib.tools.hytera_tool:HyteraTool.hdap" dmrlib-hytera-hrnp = "okdmr.dmrlib.tools.hytera_tool:HyteraTool.hrnp" dmrlib-hytera-hstrp = "okdmr.dmrlib.tools.hytera_tool:HyteraTool.hstrp" +dmrlib-hytera-ispc = "okdmr.dmrlib.tools.hytera_tool:HyteraTool.ipsc" dmrlib-hytera-lp = "okdmr.dmrlib.tools.hytera_tool:HyteraTool.lp" dmrlib-hytera-rcp = "okdmr.dmrlib.tools.hytera_tool:HyteraTool.rcp" dmrlib-hytera-rrs = "okdmr.dmrlib.tools.hytera_tool:HyteraTool.rrs"