Skip to content

Commit

Permalink
crc7, reverse channel (RC PDU), parsing from hytera IPSC and necessit…
Browse files Browse the repository at this point in the history
…ies, #8
  • Loading branch information
smarek committed Dec 1, 2024
1 parent 8e3c84d commit 1a00ff4
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 0 deletions.
42 changes: 42 additions & 0 deletions okdmr/dmrlib/etsi/crc/crc7.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from bitarray import bitarray
from bitarray.util import ba2int

from okdmr.dmrlib.etsi.crc.crc import BitCrcCalculator, Crc7
from okdmr.dmrlib.etsi.layer2.elements.crc_masks import CrcMasks


class CRC7:
"""
B.3.13 7-bit CRC calculation - ETSI TS 102 361-1 V2.5.1 (2017-10)
"""

CALC: BitCrcCalculator = BitCrcCalculator(
table_based=True, configuration=Crc7.ETSI_DMR
)

@staticmethod
def check(
data: bitarray, crc7: int, mask: CrcMasks = CrcMasks.ReverseChannel
) -> bool:
"""
Will check that provided crc7 param matches the internal calculation
:param data: bitarray data
:param mask: usually only CrcMasks.ReverseChannel
:param crc7 expected result
:return: verification result
"""
assert (
0x00 <= crc7 <= 0x7F
), f"crc7 is expected in range (exclusive) 0-{0x7F}, got {crc7}"

return CRC7.calculate(data, mask) == crc7

@staticmethod
def calculate(data: bitarray, mask: CrcMasks = CrcMasks.ReverseChannel) -> int:
"""
Will perform bytes-swap of payload and returns crc7 as int
:param data: bytes object of data to be checked
:param mask: usually only CrcMasks.ReverseChannel
:return: int crc7
"""
return ba2int(CRC7.CALC.calculate_checksum(data)) ^ mask.value
11 changes: 11 additions & 0 deletions okdmr/dmrlib/etsi/layer2/burst.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from typing import Optional, Literal

from bitarray import bitarray
from okdmr.dmrlib.etsi.layer2.elements.preemption_power_indicator import (
PreemptionPowerIndicator,
)
from okdmr.kaitai.homebrew.mmdvm2020 import Mmdvm2020
from okdmr.kaitai.hytera.ip_site_connect_protocol import IpSiteConnectProtocol

Expand Down Expand Up @@ -187,6 +190,14 @@ def colour_code(self) -> int:
def __repr__(self) -> str:
status: str = f"[{self.sync_or_embedded_signalling.name}] "

if (
self.has_emb
and self.sync_or_embedded_signalling == SyncPatterns.EmbeddedSignalling
and self.emb.preemption_and_power_control_indicator
== PreemptionPowerIndicator.CarriesReverseChannelInformation
):
status += f"[RC Info {self.embedded_signalling_bits}] "

if self.is_vocoder:
status += f" [{self.voice_burst}]"

Expand Down
79 changes: 79 additions & 0 deletions okdmr/dmrlib/etsi/layer2/pdu/reverse_channel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from typing import Union, Literal, Optional

from bitarray import bitarray
from bitarray.util import ba2int, int2ba
from okdmr.dmrlib.etsi.crc.crc7 import CRC7
from okdmr.dmrlib.etsi.fec.vbptc_32_11 import VBPTC3211
from okdmr.dmrlib.etsi.layer2.elements.crc_masks import CrcMasks

from okdmr.dmrlib.utils.bits_bytes import bits_to_bytes, bytes_to_bits
from okdmr.dmrlib.utils.bits_interface import BitsInterface
from okdmr.dmrlib.utils.bytes_interface import BytesInterface
from okdmr.dmrlib.etsi.layer3.elements.reverse_channel_command import (
ReverseChannelCommand,
)


class ReverseChannel(BitsInterface, BytesInterface):
def __init__(
self,
rc_command: ReverseChannelCommand = ReverseChannelCommand.SetPowerToHighest,
crc7: Union[int, bytes] = 0,
):
self.rc_command: ReverseChannelCommand = rc_command
self.crc: int = self.calculate_crc()
self.crc_ok: bool = self.crc == (
crc7 if isinstance(crc7, int) else int.from_bytes(crc7, byteorder="big")
)

def calculate_crc(self) -> int:
return CRC7.calculate(
data=self.rc_command.as_bits(), mask=CrcMasks.ReverseChannel
)

def __repr__(self) -> str:
return f"[ReverseChannel {self.rc_command}]" + (
"" if self.crc_ok else " [CRC7 INVALID]"
)

@staticmethod
def deinterleave(bits: bitarray) -> bitarray:
return VBPTC3211.deinterleave_data_bits(bits)

@staticmethod
def from_bits(bits: bitarray) -> "ReverseChannel":
# from_bits => from on-air bits, means from interleaved form
assert len(bits) in (
32,
11,
4,
), f"not 32 (full on-air pdu), 11 (rc command + crc7) or 4 (rc command), got {len(bits)}"
deinterleaved: bitarray = (
ReverseChannel.deinterleave(bits)
if len(bits) == 32
else (bits if len(bits) == 11 else (bits + bitarray("0000000")))
)
return ReverseChannel(
rc_command=ReverseChannelCommand.from_bits(deinterleaved[0:4]),
crc7=ba2int(deinterleaved[4:11]),
)

def encode(self) -> bitarray:
"""
Returns: 32 bits of interleaved data+crc7
"""
return VBPTC3211.encode(
(self.rc_command.as_bits() + int2ba(self.calculate_crc(), 7)), False
)

def as_bits(self) -> bitarray:
return self.encode()

@staticmethod
def from_bytes(
data: bytes, endian: Literal["big", "little"] = "big"
) -> Optional["BytesInterface"]:
return ReverseChannel.from_bits(bytes_to_bits(data))

def as_bytes(self, endian: Literal["big", "little"] = "big") -> bytes:
return bits_to_bytes(self.encode())
35 changes: 35 additions & 0 deletions okdmr/dmrlib/etsi/layer3/elements/reverse_channel_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import enum

from bitarray import bitarray
from bitarray.util import ba2int, int2ba

from okdmr.dmrlib.utils.bits_interface import BitsInterface


@enum.unique
class ReverseChannelCommand(BitsInterface, enum.Enum):
"""
ETSI TS 102 361-4 V1.10.1 (2019-08) - 6.4.14.1 Reverse Channel
Table 6.31: MS Reverse Channel information elements for Power Control and Transmitter Control
"""

IncreasePowerOneStep = 0
DecreasePowerOneStep = 1
SetPowerToHighest = 2
SetPowerToLowest = 3
CeaseTransmissionCommand = 4
CeaseTransmissionRequest = 5
# range 0110 - 1111 is reserved
Reserved = 0b1111

@staticmethod
def from_bits(bits: bitarray) -> "ReverseChannelCommand":
return ReverseChannelCommand(ba2int(bits[0:4]))

def as_bits(self) -> bitarray:
return int2ba(self.value, 4)

@classmethod
def _missing_(cls, value: int) -> "ReverseChannelCommand":
print(f"Cannot find RC Command value {value}")
return ReverseChannelCommand.Reserved
34 changes: 34 additions & 0 deletions okdmr/dmrlib/tools/hytera_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@
from okdmr.dmrlib.hytera.pdu.text_message_protocol import TextMessageProtocol
from okdmr.dmrlib.utils.protocol_tool import ProtocolTool

import sys
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from typing import Type, Optional, List

from okdmr.dmrlib.etsi.layer2.burst import Burst
from okdmr.kaitai.hytera.ip_site_connect_protocol import IpSiteConnectProtocol


class HyteraTool(ProtocolTool):
@staticmethod
Expand Down Expand Up @@ -44,3 +51,30 @@ def tmp() -> None:
HyteraTool._impl(
protocol="TMP - Text Message Protocol", impl=TextMessageProtocol
)

@staticmethod
def ipsc(arguments: Optional[List[str]] = None) -> None:
parser: ArgumentParser = ArgumentParser(
description="IPSC - IP Site Connect",
formatter_class=ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"hex",
type=str,
nargs="+",
help=f"Hex encoded messages of Hytera IPSC protocol",
)
args = (
parser.parse_args(sys.argv[1:])
if (not arguments or not len(arguments))
else arguments
)
for hex_msg in args.hex:
print(hex_msg)
try:
pdu = Burst.from_hytera_ipsc(
IpSiteConnectProtocol.from_bytes(bytes.fromhex(hex_msg))
)
print(repr(pdu))
except Exception as e:
print(e, file=sys.stderr)
19 changes: 19 additions & 0 deletions okdmr/tests/dmrlib/etsi/layer2/pdu/test_reverse_channel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from okdmr.dmrlib.etsi.layer2.pdu.reverse_channel import ReverseChannel
from bitarray import bitarray

from okdmr.dmrlib.utils.bits_bytes import bits_to_bytes, byteswap_bytes, bytes_to_bits


def test_reverse_channel():
rcs: List[Tuple[str,]] = [
("01011011011000001010111000010101",),
("11111000000001100000011001000101",),
("10110101101110001110111110000100",),
("11101111101110001010111010010100",),
# ("",),
]
for (rc,) in rcs:
rc_bits: bitarray = bitarray(rc)
rc_pdu: ReverseChannel = ReverseChannel.from_bits(rc_bits)
print(repr(rc_pdu))
assert rc_pdu.as_bits() == rc_bits
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ dmrlib-hrnp-connect = "okdmr.dmrlib.tools.hrnp_client:HRNPClient.run"
dmrlib-hytera-hdap = "okdmr.dmrlib.tools.hytera_tool:HyteraTool.hdap"
dmrlib-hytera-hrnp = "okdmr.dmrlib.tools.hytera_tool:HyteraTool.hrnp"
dmrlib-hytera-hstrp = "okdmr.dmrlib.tools.hytera_tool:HyteraTool.hstrp"
dmrlib-hytera-ispc = "okdmr.dmrlib.tools.hytera_tool:HyteraTool.ipsc"
dmrlib-hytera-lp = "okdmr.dmrlib.tools.hytera_tool:HyteraTool.lp"
dmrlib-hytera-rcp = "okdmr.dmrlib.tools.hytera_tool:HyteraTool.rcp"
dmrlib-hytera-rrs = "okdmr.dmrlib.tools.hytera_tool:HyteraTool.rrs"
Expand Down

0 comments on commit 1a00ff4

Please sign in to comment.