Skip to content

Commit

Permalink
code cleanup, tests, code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
smarek committed Aug 28, 2022
1 parent 1e5996f commit a440e13
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 12 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ This package provides way to parse and assemble various DMR ETSI protocols and f
| Rate 1 Data || Rate 1 data (confirmed and unconfirmed) and last block data (confirmed and unconfirmed) |
| Rate 1/2 Data || Rate 1/2 data (confirmed and unconfirmed) and last block data (confirmed and unconfirmed) |
| Rate 3/4 Data || Rate 3/4 data (confirmed and unconfirmed) and last block data (confirmed and unconfirmed) |
| Full/Short Link Control || FLC/SLC PDUs |
| UDP/IPv4 || UDP/IPv4 compressed header/packet |

### ETSI Information Elements

Expand All @@ -68,7 +70,7 @@ Synchronization pattern), Activity ID, Additional Information Field, Answer/Resp
Dynamic Identifier), Position Error, Reason Code, Service Options, Talker Alias Data Format, Defined Data Format (DD),
Selective Automatic Repeat reQuest (SARQ),
Re-Synchronize Flag (S), Send sequence number (N(S)), SAP identifier (SAP), Supplementary Flag (SF), Unified Data
Transport Format (UDT Format)
Transport Format (UDT Format), UDP Port Identifier (SPID/DPID), IP Address Identifier (SAID/DAID)

### Hytera

Expand Down Expand Up @@ -100,6 +102,7 @@ Transport Format (UDT Format)
### Additional notes

- Almost every class/enum supports BitsInterface (de-serialization from on-air bits, serialization to transmission bits)
, or for byte-aligned protocols (Hytera, Motorola) BytesInterface (with explicit endianness support)
- Every FEC/CRC implemented supports both calculation, verification and (if possible) also self-correction
- Working with Vocoder and Data/Control Bursts is supported, along with handling rates 1, 1/2 and 3/4
- CRCs interface classes may require appropriate CRC Mask to be provided when generating or verifying
Expand Down
5 changes: 0 additions & 5 deletions okdmr/dmrlib/etsi/layer2/burst.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,6 @@ def from_mmdvm(mmdvm: Mmdvm2020.TypeDmrData) -> "Burst":
b.set_sequence_no(mmdvm.sequence_no)
b.source_radio_id = mmdvm.source_id
b.target_radio_id = mmdvm.target_id
if b.target_radio_id == 0:
print(f"mmdvm radio id 0")
b.timeslot = 1 if mmdvm.slot_no == Mmdvm2020.Timeslots.timeslot_1 else 2
return b

Expand Down Expand Up @@ -283,9 +281,6 @@ def from_hytera_ipsc(ipsc: IpSiteConnectProtocol) -> "Burst":
b.set_sequence_no(ipsc.sequence_number)
b.source_radio_id = ipsc.source_radio_id
b.target_radio_id = ipsc.destination_radio_id
if b.target_radio_id == 0:
print("ipsc radio id 0")
prettyprint(ipsc)
b.timeslot = (
1 if ipsc.timeslot_raw == IpSiteConnectProtocol.Timeslots.timeslot_1 else 2
)
Expand Down
73 changes: 73 additions & 0 deletions okdmr/dmrlib/tools/pcap_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,35 @@ def process_packet(self, data: bytes, packet: IP) -> Optional[FullLinkControl]:
return full_lc


class IPSCAnalyze:
def __init__(self):
self.map: Dict[
Tuple[IpSiteConnectProtocol.SlotTypes, IpSiteConnectProtocol.FrameTypes],
Dict[str, int],
] = dict()

def process_packet(self, data: bytes, packet: IP) -> None:
kaitai_pkt: Optional[KaitaiStruct] = try_parse_packet(udpdata=data)
burst: Optional[Burst] = PcapTool.debug_packet(
data=data, packet=packet, hide_unknown=True, silent=True
)
if isinstance(kaitai_pkt, IpSiteConnectProtocol) and burst:
rowkey = (kaitai_pkt.slot_type, kaitai_pkt.frame_type)
data = burst.extract_data()
if not data:
return
prettyprint(kaitai_pkt)
row = self.map.get(rowkey, dict())
row[data.__class__.__name__] = row.get(data.__class__.__name__, 0) + 1
self.map[rowkey] = row

def print_stats(self):
for ((slot, frame), dt_stats) in self.map.items():
print(f"SLOT: {slot} FRAME: {frame}")
for (dt, dt_count) in dt_stats.items():
print(f"COUNT: {dt_count}\tDT: {dt}")


# noinspection PyDefaultArgument
class PcapTool:
"""
Expand Down Expand Up @@ -117,11 +146,22 @@ def debug_packet(
) -> Optional[Burst]:
pkt = try_parse_packet(udpdata=data)
burst: Optional[Burst] = None
ip_str: str = f"{packet.src}:{packet.getlayer(UDP).sport}\t-> {packet.dst}:{packet.getlayer(UDP).dport}\t"
if isinstance(pkt, IpSiteConnectProtocol):
burst: Burst = Burst.from_hytera_ipsc(pkt)
if not silent:
print(
f"{ip_str} IPSC TS:{1 if pkt.timeslot_raw == IpSiteConnectProtocol.Timeslots.timeslot_1 else 2} "
f"SEQ: {pkt.sequence_number} {repr(burst)}"
)
elif isinstance(pkt, Mmdvm2020):
if isinstance(pkt.command_data, Mmdvm2020.TypeDmrData):
burst: Burst = Burst.from_mmdvm(pkt.command_data)
if not silent:
print(
f"{ip_str} MMDVM TS:{1 if pkt.command_data.slot_no == Mmdvm2020.Timeslots.timeslot_1 else 2} "
f"SEQ: {pkt.command_data.sequence_no} {repr(burst)}"
)
elif isinstance(pkt, IpSiteConnectHeartbeat):
pass
elif not hide_unknown and not silent:
Expand Down Expand Up @@ -176,6 +216,7 @@ def print_pcap(
files: List[str],
ports_whitelist: List[int] = [],
ports_blacklist: List[int] = [],
ip_whitelist: List[str] = [],
print_statistics: bool = True,
print_raw: bool = False,
callback: Optional[Callable] = None,
Expand All @@ -186,6 +227,7 @@ def print_pcap(
:param files:
:param ports_whitelist:
:param ports_blacklist:
:param ip_whitelist:
:param print_raw:
:param print_statistics: whether statistics should be printed directly to stdout
:return: port statistics (dict [key=sport/dport number] [value=number of packets encountered])
Expand All @@ -194,6 +236,7 @@ def print_pcap(
files=files,
ports_whitelist=ports_whitelist,
ports_blacklist=ports_blacklist,
ip_whitelist=ip_whitelist,
print_raw=print_raw,
callback=PcapTool.debug_packet if callback is None else callback,
)
Expand All @@ -211,6 +254,7 @@ def iter_pcap(
callback: Optional[Callable] = None,
ports_whitelist: List[int] = [],
ports_blacklist: List[int] = [],
ip_whitelist: List[str] = [],
print_raw: bool = False,
) -> Dict[int, int]:
"""
Expand All @@ -221,6 +265,7 @@ def iter_pcap(
:param ports_blacklist:
:param callback:
:param ports_whitelist:
:param ip_whitelist:
:return: port statistics (dict [key=sport/dport number] [value=number of packets encountered])
"""
assert isinstance(ports_blacklist, list)
Expand All @@ -247,6 +292,11 @@ def iter_pcap(
statistics.get(udp_layer.dport, 0) + 1
)

if len(ip_whitelist):
# if no whitelisted ips, do not filter
if ip_layer.src not in ip_whitelist:
continue

if len(ports_whitelist):
# if no ports whitelisted, do not filter
if (
Expand Down Expand Up @@ -382,6 +432,21 @@ def _arguments() -> ArgumentParser:
dest="verbose",
help="Verbose logging",
)
parser.add_argument(
"--ipsc",
action="store_true",
default=False,
dest="analyze_ipsc",
help="Analyze IPSC traffic (map between slot-type, frame-type and data-type)",
)
parser.add_argument(
"--filter-ip",
dest="filter_ip",
type=str,
nargs="+",
default=[],
help="Filter traffic by origin IP address(es)",
)
return parser

@staticmethod
Expand All @@ -404,6 +469,7 @@ def main(
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)

callback = PcapTool.debug_packet
ipsc_analyze = IPSCAnalyze()
if args.extract_embedded_lc:
callback = EmbeddedExtractor().process_packet
elif args.observe_transmissions:
Expand All @@ -412,15 +478,22 @@ def main(
.set_debug_voice_bytes(do_debug=args.debug_vocoder_bytes)
.process_packet
)
elif args.analyze_ipsc:
callback = ipsc_analyze.process_packet

stats = PcapTool.print_pcap(
files=args.files,
ports_whitelist=args.whitelist_ports,
ports_blacklist=args.blacklist_ports,
ip_whitelist=args.filter_ip,
print_statistics=not args.no_statistics,
print_raw=args.print_raw,
callback=callback,
)

if args.analyze_ipsc:
ipsc_analyze.print_stats()

if return_stats:
return stats

Expand Down
2 changes: 2 additions & 0 deletions okdmr/dmrlib/utils/parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def parse_hytera_data(bytedata: bytes) -> KaitaiStruct:
def try_parse_packet(udpdata: bytes) -> Optional[KaitaiStruct]:

try:
# known unsupported, that incidentally gets decoded as Hytera IPSC packet
if udpdata[:4] == b"USRP":
return None
finally:
Expand All @@ -58,6 +59,7 @@ def try_parse_packet(udpdata: bytes) -> Optional[KaitaiStruct]:
try:
mmdvm = Mmdvm2020.from_bytes(udpdata)
if hasattr(mmdvm, "command_data"):
# packets with unknown/invalid/unsupported prefixes, won't have command_data attribute
return mmdvm
except BaseException as e:
if (
Expand Down
24 changes: 21 additions & 3 deletions okdmr/tests/dmrlib/etsi/layer2/test_burst.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import sys
from typing import List, Tuple

from okdmr.kaitai.homebrew.mmdvm2020 import Mmdvm2020
from okdmr.kaitai.hytera.ip_site_connect_protocol import IpSiteConnectProtocol

from okdmr.dmrlib.etsi.layer2.burst import Burst
from okdmr.dmrlib.etsi.layer2.elements.burst_types import BurstTypes
from okdmr.dmrlib.etsi.layer2.elements.data_types import DataTypes
Expand All @@ -13,6 +10,8 @@
from okdmr.dmrlib.hytera.hytera_ipsc_wakeup import HyteraIPSCWakeup
from okdmr.dmrlib.transmission.transmission import Transmission
from okdmr.dmrlib.transmission.transmission_types import TransmissionTypes
from okdmr.kaitai.homebrew.mmdvm2020 import Mmdvm2020
from okdmr.kaitai.hytera.ip_site_connect_protocol import IpSiteConnectProtocol


def test_burst_info(capsys):
Expand Down Expand Up @@ -104,6 +103,25 @@ def test_burst_info_hytera():
assert len(repr(burst))


def test_guess_target_radio_id():
bursts: List[Tuple[str, int]] = [
(
"5a5a5a5a570300004100050102000000222233335555000040f5c545f705e8bd0c26080850b4fd9457ff5dd7dcf5e6ae3877796501781fbb1a330046f7050000fc372300fe372300",
2308092,
),
(
"5a5a5a5a50030000410005010200000022224444555500004091613a89349c25697b03a66368bd5557ff5dd7d5f5785db87af534662b1d4a3794000989340000fc372300fe372300",
2308092,
),
]
for ipsc_burst, expected_id in bursts:
ipsc: IpSiteConnectProtocol = IpSiteConnectProtocol.from_bytes(
bytes.fromhex(ipsc_burst)
)
burst: Burst = Burst.from_hytera_ipsc(ipsc)
assert burst.guess_target_radio_id() == expected_id


def test_burst_as_bits():
bursts: List[Tuple[str, BurstTypes]] = [
(
Expand Down
40 changes: 40 additions & 0 deletions okdmr/tests/dmrlib/hytera/pdu/test_rrs.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import pytest
from okdmr.dmrlib.hytera.pdu.radio_ip import RadioIP

from okdmr.dmrlib.hytera.pdu.radio_registration_service import (
RRSTypes,
RRSResult,
RRSRadioState,
RadioRegistrationService,
)


Expand All @@ -19,3 +21,41 @@ def test_rrs_enums():
with pytest.raises(ValueError):
RRSRadioState(0x02)
assert RRSRadioState.Offline.as_bytes() == b"\x01"


def test_rrs_answer():
rrs_bytes = b"\x91\x00\x80\x00\t\n\x00\x00P\x00\x00\x00\x0e\x101\x03"
rrs = RadioRegistrationService(
opcode=RRSTypes.RadioRegistrationAnswer,
is_reliable=True,
radio_ip=RadioIP.from_ip("10.0.0.80"),
result=RRSResult.Success,
renew_time_seconds=3600,
)
assert RadioRegistrationService.from_bytes(rrs_bytes).as_bytes() == rrs_bytes
assert rrs.as_bytes() == rrs_bytes
assert len(repr(rrs))


def test_rrs_status_check_request():
rrs_bytes = b"\x91\x00\x02\x00\x04\n\x00\x00\x14\x0e\x03"
rrs = RadioRegistrationService(
opcode=RRSTypes.RegistrationStatusCheckRequest,
is_reliable=True,
radio_ip=RadioIP.from_ip("10.0.0.20"),
)
assert RadioRegistrationService.from_bytes(rrs_bytes).as_bytes() == rrs_bytes
assert rrs.as_bytes() == rrs_bytes
assert len(repr(rrs))


def test_rrs_status_check_answer():
rrs_bytes = b"\x11\x00\x82\x00\x05\n\x00\x00!\x00\x80\x03"
rrs = RadioRegistrationService(
opcode=RRSTypes.RegistrationStatusCheckAnswer,
radio_ip=RadioIP.from_ip("10.0.0.33"),
radio_state=RRSRadioState.Online,
)
assert RadioRegistrationService.from_bytes(rrs_bytes).as_bytes() == rrs_bytes
assert rrs.as_bytes() == rrs_bytes
assert len(repr(rrs))
29 changes: 26 additions & 3 deletions okdmr/tests/dmrlib/tools/test_pcap_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
from typing import Tuple, List, Optional

from _pytest.capture import CaptureFixture
from scapy.layers.inet import IP

from okdmr.dmrlib.etsi.layer2.elements.flcos import FLCOs
from okdmr.dmrlib.etsi.layer2.pdu.full_link_control import FullLinkControl
from okdmr.dmrlib.tools.pcap_tool import PcapTool, EmbeddedExtractor
from okdmr.dmrlib.tools.pcap_tool import PcapTool, EmbeddedExtractor, IPSCAnalyze
from scapy.layers.inet import IP, UDP
from scapy.packet import Raw


class PcapCounterHelper:
Expand Down Expand Up @@ -162,3 +162,26 @@ def test_embedded_extractor(capsys: CaptureFixture):
assert full_lc.group_address == 9
assert full_lc.source_address == 2623266
assert full_lc.full_link_control_opcode == FLCOs.GroupVoiceChannelUser


def test_ipsc_analyze(capsys):
ia: IPSCAnalyze = IPSCAnalyze()
pkts: List[str] = [
"5a5a5a5a660000004100050101000000111111111111000040b951018849a00b381b4016806c6dc457ff5dd7def5993218016020a005412310390033884901000900000022072800",
"5a5a5a5a670000004100050101000000111111111111000040b951018849a00b381b4016806c6dc457ff5dd7def5993218016020a005412310390033884901000900000022072800",
"5a5a5a5a690000004100050101000000111100001111000040905b1219a4cc30a1d92317220a0d8457ff5dd7ddf53f9dc071c040a5085f0b1d1c001919a401000900000022072800",
"5a5a5a5a0000000042000501010000001111eeee11111111400000001000400000000000090028000700220000000000000000000000000030305032503801000900000022072800",
]
for pkt in pkts:
pkt_bytes: bytes = bytes.fromhex(pkt)
ia.process_packet(
data=pkt_bytes,
packet=IP(src="192.168.0.1")
/ UDP(sport="123", dport="123")
/ Raw(_pkt=pkt_bytes),
)

ia.print_stats()
captured = capsys.readouterr()
assert len(captured.out)
assert not len(captured.err)

0 comments on commit a440e13

Please sign in to comment.