Skip to content

Commit

Permalink
CCSDS spacepackets improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
robamu committed Jun 14, 2022
1 parent 6f1380c commit 6e20d4d
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 44 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [unreleased]

## [v0.9.0]

- API improvements, bugfix and general improvements for CCSDS spacepacket
header implementation

## [v0.8.1]

- Named value for fetching global APID
Expand Down
2 changes: 1 addition & 1 deletion examples/example_spacepacket.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
def main():
print("-- Space Packet examples --")
spacepacket_header = SpacePacketHeader(
packet_type=PacketTypes.TC, apid=0x01, source_sequence_count=0, data_length=0
packet_type=PacketTypes.TC, apid=0x01, ssc=0, data_length=0
)
header_as_bytes = spacepacket_header.pack()
print_string = get_printable_data_string(
Expand Down
2 changes: 1 addition & 1 deletion examples/example_uslp.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def main():
sequence_flags=SequenceFlags.UNSEGMENTED,
apid=SPACECRAFT_ID,
data_length=len(data) - 1,
source_sequence_count=0,
ssc=0,
)
tfdz = space_packet_wrapper.pack() + data
tfdf = TransferFrameDataField(
Expand Down
2 changes: 1 addition & 1 deletion spacepackets/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.8.1"
__version__ = "0.9.0"
80 changes: 49 additions & 31 deletions spacepackets/ccsds/spacepacket.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations
import enum
import struct

from typing import Tuple, Deque, List, Final
from spacepackets.log import get_console_logger
Expand All @@ -23,27 +24,29 @@ class SpacePacketHeader:
"""This class encapsulates the space packet header.
Packet reference: Blue Book CCSDS 133.0-B-2"""

SEQ_FLAG_MASK = 0xC000

def __init__(
self,
packet_type: PacketTypes,
apid: int,
source_sequence_count: int,
ssc: int,
data_length: int,
packet_version: int = 0b000,
secondary_header_flag: bool = True,
sec_header_flag: bool = True,
sequence_flags: SequenceFlags = SequenceFlags.UNSEGMENTED,
):
"""Create a space packet header with the given field parameters
:param packet_type: 0 for Telemetery, 1 for Telecommands
:param apid: Application Process ID, should not be larger
than 11 bits, deciaml 2074 or hex 0x7ff
:param source_sequence_count: Sequence counter, should not be larger than 0x3fff or
:param ssc: Source sequence counter, should not be larger than 0x3fff or
decimal 16383
:param data_length: Contains a length count C that equals one fewer than the length of the
packet data field. Should not be larger than 65535 bytes
:param packet_version:
:param secondary_header_flag:
:param sec_header_flag: Secondary header flag, 1 or True by default
:param sequence_flags:
:raises ValueError: On invalid parameters
"""
Expand All @@ -54,7 +57,7 @@ def __init__(
f"Invalid APID, exceeds maximum value {pow(2, 11) - 1} or negative"
)
raise ValueError
if source_sequence_count > pow(2, 14) - 1 or source_sequence_count < 0:
if ssc > pow(2, 14) - 1 or ssc < 0:
logger = get_console_logger()
logger.warning(
f"Invalid source sequence count, exceeds maximum value {pow(2, 14)- 1} or negative"
Expand All @@ -67,8 +70,8 @@ def __init__(
)
raise ValueError
self.apid = apid
self.ssc = source_sequence_count
self.secondary_header_flag = secondary_header_flag
self.ssc = ssc
self.sec_header_flag = sec_header_flag
self.sequence_flags = sequence_flags
self.psc = get_space_packet_sequence_control(
sequence_flags=self.sequence_flags, source_sequence_count=self.ssc
Expand All @@ -77,19 +80,36 @@ def __init__(
self.data_length = data_length
self.packet_id = get_space_packet_id_num(
packet_type=self.packet_type,
secondary_header_flag=self.secondary_header_flag,
secondary_header_flag=self.sec_header_flag,
apid=self.apid,
)

@classmethod
def from_composite_fields(
cls,
packet_id: int,
psc: int,
data_length: int,
packet_version: int = 0b000,
) -> SpacePacketHeader:
return SpacePacketHeader(
packet_type=PacketTypes(packet_id >> 12 & 0x01),
packet_version=packet_version,
sec_header_flag=bool((packet_id >> 11) & 0x01),
data_length=data_length,
sequence_flags=SequenceFlags((psc & cls.SEQ_FLAG_MASK) >> 14),
ssc=psc & (~cls.SEQ_FLAG_MASK),
apid=packet_id & 0x7FF,
)

def pack(self) -> bytearray:
"""Serialize raw space packet header into a bytearray"""
"""Serialize raw space packet header into a bytearray, using big endian for each
2 octet field of the space packet header"""
header = bytearray()
header.append((self.packet_id & 0xFF00) >> 8)
header.append(self.packet_id & 0xFF)
header.append((self.psc & 0xFF00) >> 8)
header.append(self.psc & 0xFF)
header.append((self.data_length & 0xFF00) >> 8)
header.append(self.data_length & 0xFF)
packet_id_with_version = self.version << 13 | self.packet_id
header.extend(struct.pack("!H", packet_id_with_version))
header.extend(struct.pack("!H", self.psc))
header.extend(struct.pack("!H", self.data_length))
return header

@property
Expand All @@ -114,25 +134,21 @@ def unpack(cls, space_packet_raw: bytes) -> SpacePacketHeader:
logger = get_console_logger()
logger.warning("Packet size smaller than PUS header size!")
raise ValueError
packet_type = space_packet_raw[0] & 0x10
if packet_type == 0:
packet_type = PacketTypes.TM
else:
packet_type = PacketTypes.TC
packet_version = space_packet_raw[0] >> 5
secondary_header_flag = (space_packet_raw[0] & 0x8) >> 3
apid = ((space_packet_raw[0] & 0x7) << 8) | space_packet_raw[1]
sequence_flags = (space_packet_raw[2] & 0xC0) >> 6
ssc = ((space_packet_raw[2] << 8) | space_packet_raw[3]) & 0x3FFF
data_length = space_packet_raw[4] << 8 | space_packet_raw[5]
packet_version = (space_packet_raw[0] >> 5) & 0b111
packet_type = PacketTypes((space_packet_raw[0] >> 4) & 0b1)
secondary_header_flag = (space_packet_raw[0] >> 3) & 0b1
apid = ((space_packet_raw[0] & 0b111) << 8) | space_packet_raw[1]
psc = struct.unpack("!H", space_packet_raw[2:4])[0]
sequence_flags = (psc & SpacePacketHeader.SEQ_FLAG_MASK) >> 14
ssc = psc & (~SpacePacketHeader.SEQ_FLAG_MASK)
return SpacePacketHeader(
packet_type=packet_type,
apid=apid,
secondary_header_flag=bool(secondary_header_flag),
sec_header_flag=bool(secondary_header_flag),
packet_version=packet_version,
data_length=data_length,
data_length=struct.unpack("!H", space_packet_raw[4:6])[0],
sequence_flags=SequenceFlags(sequence_flags),
source_sequence_count=ssc,
ssc=ssc,
)

def __str__(self):
Expand All @@ -143,8 +159,10 @@ def __str__(self):

def __repr__(self):
return (
f"{self.__class__.__name__}(packet_type={self.packet_type!r}, "
f"packet_id={self.packet_id!r}, apid={self.apid!r}, ssc={self.ssc!r})"
f"{self.__class__.__name__}(packet_version={self.version!r}, "
f"packet_type={self.packet_type!r}, apid={self.apid!r}, ssc={self.ssc!r}),"
f"data_length={self.data_length!r}, sec_header_flag={self.sec_header_flag!r},"
f"sequence_flags={self.sequence_flags!r}"
)


Expand Down
4 changes: 2 additions & 2 deletions spacepackets/ecss/tc.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,10 @@ def __init__(
)
self.space_packet_header = SpacePacketHeader(
apid=apid,
secondary_header_flag=bool(secondary_header_flag),
sec_header_flag=bool(secondary_header_flag),
packet_type=PacketTypes.TC,
data_length=data_length,
source_sequence_count=ssc,
ssc=ssc,
)
self._app_data = app_data
self._valid = True
Expand Down
4 changes: 2 additions & 2 deletions spacepackets/ecss/tm.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ def __init__(
self.space_packet_header = SpacePacketHeader(
apid=apid,
packet_type=packet_type,
secondary_header_flag=secondary_header_flag,
sec_header_flag=secondary_header_flag,
packet_version=packet_version,
data_length=data_length,
source_sequence_count=ssc,
ssc=ssc,
)
self.secondary_packet_header = PusTmSecondaryHeader(
pus_version=pus_version,
Expand Down
36 changes: 30 additions & 6 deletions tests/test_ccsds.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ def test_spacepacket(self):
sp_header = SpacePacketHeader(
apid=0x02,
data_length=22,
source_sequence_count=52,
ssc=52,
packet_type=PacketTypes.TC,
sequence_flags=SequenceFlags.FIRST_SEGMENT,
)
self.assertEqual(sp_header.apid, 0x02)
self.assertEqual(sp_header.ssc, 52)
Expand All @@ -38,26 +39,31 @@ def test_spacepacket(self):
SpacePacketHeader,
apid=982292,
data_length=22,
source_sequence_count=52,
ssc=52,
packet_type=PacketTypes.TC,
)
self.assertRaises(
ValueError,
SpacePacketHeader,
apid=0x02,
data_length=679393,
source_sequence_count=52,
ssc=52,
packet_type=PacketTypes.TC,
)
self.assertRaises(
ValueError,
SpacePacketHeader,
apid=0x02,
data_length=22,
source_sequence_count=96030,
ssc=96030,
packet_type=PacketTypes.TC,
)
self.assertRaises(ValueError, SpacePacketHeader.unpack, bytearray())
self.assertEqual(sp_unpacked.packet_type, PacketTypes.TC)
self.assertEqual(sp_unpacked.apid, 0x02)
self.assertEqual(sp_unpacked.version, 0b000)
self.assertEqual(sp_unpacked.ssc, 52)
self.assertEqual(sp_unpacked.sequence_flags, SequenceFlags.FIRST_SEGMENT)
print(sp_header)
print(sp_header.__repr__())

Expand Down Expand Up @@ -95,15 +101,33 @@ def test_spacepacket(self):
raw_header = get_space_packet_header(
packet_id=packet_id, packet_sequence_control=psc, data_length=22
)
self.assertEqual(raw_header[0], (packet_id & 0xFF00) >> 8)
self.assertEqual(raw_header[0], ((packet_id & 0xFF00) >> 8) & 0x1FFF)
self.assertEqual(raw_header[1], packet_id & 0xFF)
self.assertEqual(raw_header[2], (psc & 0xFF00) >> 8)
self.assertEqual(raw_header[3], psc & 0xFF)
self.assertEqual(raw_header[4], (22 & 0xFF00) >> 8)
self.assertEqual(raw_header[5], 22 & 0xFF)

header_from_composite = SpacePacketHeader.from_composite_fields(
packet_id=packet_id, psc=psc, data_length=22
)
self.assertEqual(header_from_composite.pack(), raw_header)
header_tm = SpacePacketHeader(
packet_type=PacketTypes.TM,
sequence_flags=SequenceFlags.UNSEGMENTED,
apid=0x12,
data_length=7,
ssc=28,
)
raw = header_tm.pack()
header_tm_back = SpacePacketHeader.unpack(raw)
self.assertEqual(header_tm_back.packet_type, PacketTypes.TM)
self.assertEqual(header_tm_back.apid, 0x12)
self.assertEqual(header_tm_back.version, 0b000)
self.assertEqual(header_tm_back.ssc, 28)
self.assertEqual(header_tm_back.data_length, 7)

def test_sp_parser(self):
raw_buffer = bytearray()
tm_packet = PusTelemetry(service=17, subservice=2, pus_version=PusVersion.PUS_C)
packet_ids = (tm_packet.packet_id,)
tm_packet_raw = tm_packet.pack()
Expand Down

0 comments on commit 6e20d4d

Please sign in to comment.