diff --git a/CHANGELOG.md b/CHANGELOG.md index ba9168e..fc2d5a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [unreleased] +## [v0.9.0] + +- API improvements, bugfix and general improvements for CCSDS spacepacket + header implementation + ## [v0.8.1] - Named value for fetching global APID diff --git a/examples/example_spacepacket.py b/examples/example_spacepacket.py index c00af2f..36554c1 100644 --- a/examples/example_spacepacket.py +++ b/examples/example_spacepacket.py @@ -5,7 +5,7 @@ def main(): print("-- Space Packet examples --") spacepacket_header = SpacePacketHeader( - packet_type=PacketTypes.TC, apid=0x01, source_sequence_count=0, data_length=0 + packet_type=PacketTypes.TC, apid=0x01, ssc=0, data_length=0 ) header_as_bytes = spacepacket_header.pack() print_string = get_printable_data_string( diff --git a/examples/example_uslp.py b/examples/example_uslp.py index 22eaa46..13107ac 100644 --- a/examples/example_uslp.py +++ b/examples/example_uslp.py @@ -35,7 +35,7 @@ def main(): sequence_flags=SequenceFlags.UNSEGMENTED, apid=SPACECRAFT_ID, data_length=len(data) - 1, - source_sequence_count=0, + ssc=0, ) tfdz = space_packet_wrapper.pack() + data tfdf = TransferFrameDataField( diff --git a/spacepackets/__init__.py b/spacepackets/__init__.py index 8088f75..3e2f46a 100644 --- a/spacepackets/__init__.py +++ b/spacepackets/__init__.py @@ -1 +1 @@ -__version__ = "0.8.1" +__version__ = "0.9.0" diff --git a/spacepackets/ccsds/spacepacket.py b/spacepackets/ccsds/spacepacket.py index 0088a02..e167217 100644 --- a/spacepackets/ccsds/spacepacket.py +++ b/spacepackets/ccsds/spacepacket.py @@ -1,5 +1,6 @@ from __future__ import annotations import enum +import struct from typing import Tuple, Deque, List, Final from spacepackets.log import get_console_logger @@ -23,14 +24,16 @@ class SpacePacketHeader: """This class encapsulates the space packet header. Packet reference: Blue Book CCSDS 133.0-B-2""" + SEQ_FLAG_MASK = 0xC000 + def __init__( self, packet_type: PacketTypes, apid: int, - source_sequence_count: int, + ssc: int, data_length: int, packet_version: int = 0b000, - secondary_header_flag: bool = True, + sec_header_flag: bool = True, sequence_flags: SequenceFlags = SequenceFlags.UNSEGMENTED, ): """Create a space packet header with the given field parameters @@ -38,12 +41,12 @@ def __init__( :param packet_type: 0 for Telemetery, 1 for Telecommands :param apid: Application Process ID, should not be larger than 11 bits, deciaml 2074 or hex 0x7ff - :param source_sequence_count: Sequence counter, should not be larger than 0x3fff or + :param ssc: Source sequence counter, should not be larger than 0x3fff or decimal 16383 :param data_length: Contains a length count C that equals one fewer than the length of the packet data field. Should not be larger than 65535 bytes :param packet_version: - :param secondary_header_flag: + :param sec_header_flag: Secondary header flag, 1 or True by default :param sequence_flags: :raises ValueError: On invalid parameters """ @@ -54,7 +57,7 @@ def __init__( f"Invalid APID, exceeds maximum value {pow(2, 11) - 1} or negative" ) raise ValueError - if source_sequence_count > pow(2, 14) - 1 or source_sequence_count < 0: + if ssc > pow(2, 14) - 1 or ssc < 0: logger = get_console_logger() logger.warning( f"Invalid source sequence count, exceeds maximum value {pow(2, 14)- 1} or negative" @@ -67,8 +70,8 @@ def __init__( ) raise ValueError self.apid = apid - self.ssc = source_sequence_count - self.secondary_header_flag = secondary_header_flag + self.ssc = ssc + self.sec_header_flag = sec_header_flag self.sequence_flags = sequence_flags self.psc = get_space_packet_sequence_control( sequence_flags=self.sequence_flags, source_sequence_count=self.ssc @@ -77,19 +80,36 @@ def __init__( self.data_length = data_length self.packet_id = get_space_packet_id_num( packet_type=self.packet_type, - secondary_header_flag=self.secondary_header_flag, + secondary_header_flag=self.sec_header_flag, apid=self.apid, ) + @classmethod + def from_composite_fields( + cls, + packet_id: int, + psc: int, + data_length: int, + packet_version: int = 0b000, + ) -> SpacePacketHeader: + return SpacePacketHeader( + packet_type=PacketTypes(packet_id >> 12 & 0x01), + packet_version=packet_version, + sec_header_flag=bool((packet_id >> 11) & 0x01), + data_length=data_length, + sequence_flags=SequenceFlags((psc & cls.SEQ_FLAG_MASK) >> 14), + ssc=psc & (~cls.SEQ_FLAG_MASK), + apid=packet_id & 0x7FF, + ) + def pack(self) -> bytearray: - """Serialize raw space packet header into a bytearray""" + """Serialize raw space packet header into a bytearray, using big endian for each + 2 octet field of the space packet header""" header = bytearray() - header.append((self.packet_id & 0xFF00) >> 8) - header.append(self.packet_id & 0xFF) - header.append((self.psc & 0xFF00) >> 8) - header.append(self.psc & 0xFF) - header.append((self.data_length & 0xFF00) >> 8) - header.append(self.data_length & 0xFF) + packet_id_with_version = self.version << 13 | self.packet_id + header.extend(struct.pack("!H", packet_id_with_version)) + header.extend(struct.pack("!H", self.psc)) + header.extend(struct.pack("!H", self.data_length)) return header @property @@ -114,25 +134,21 @@ def unpack(cls, space_packet_raw: bytes) -> SpacePacketHeader: logger = get_console_logger() logger.warning("Packet size smaller than PUS header size!") raise ValueError - packet_type = space_packet_raw[0] & 0x10 - if packet_type == 0: - packet_type = PacketTypes.TM - else: - packet_type = PacketTypes.TC - packet_version = space_packet_raw[0] >> 5 - secondary_header_flag = (space_packet_raw[0] & 0x8) >> 3 - apid = ((space_packet_raw[0] & 0x7) << 8) | space_packet_raw[1] - sequence_flags = (space_packet_raw[2] & 0xC0) >> 6 - ssc = ((space_packet_raw[2] << 8) | space_packet_raw[3]) & 0x3FFF - data_length = space_packet_raw[4] << 8 | space_packet_raw[5] + packet_version = (space_packet_raw[0] >> 5) & 0b111 + packet_type = PacketTypes((space_packet_raw[0] >> 4) & 0b1) + secondary_header_flag = (space_packet_raw[0] >> 3) & 0b1 + apid = ((space_packet_raw[0] & 0b111) << 8) | space_packet_raw[1] + psc = struct.unpack("!H", space_packet_raw[2:4])[0] + sequence_flags = (psc & SpacePacketHeader.SEQ_FLAG_MASK) >> 14 + ssc = psc & (~SpacePacketHeader.SEQ_FLAG_MASK) return SpacePacketHeader( packet_type=packet_type, apid=apid, - secondary_header_flag=bool(secondary_header_flag), + sec_header_flag=bool(secondary_header_flag), packet_version=packet_version, - data_length=data_length, + data_length=struct.unpack("!H", space_packet_raw[4:6])[0], sequence_flags=SequenceFlags(sequence_flags), - source_sequence_count=ssc, + ssc=ssc, ) def __str__(self): @@ -143,8 +159,10 @@ def __str__(self): def __repr__(self): return ( - f"{self.__class__.__name__}(packet_type={self.packet_type!r}, " - f"packet_id={self.packet_id!r}, apid={self.apid!r}, ssc={self.ssc!r})" + f"{self.__class__.__name__}(packet_version={self.version!r}, " + f"packet_type={self.packet_type!r}, apid={self.apid!r}, ssc={self.ssc!r})," + f"data_length={self.data_length!r}, sec_header_flag={self.sec_header_flag!r}," + f"sequence_flags={self.sequence_flags!r}" ) diff --git a/spacepackets/ecss/tc.py b/spacepackets/ecss/tc.py index 3464db5..53c7c3e 100644 --- a/spacepackets/ecss/tc.py +++ b/spacepackets/ecss/tc.py @@ -220,10 +220,10 @@ def __init__( ) self.space_packet_header = SpacePacketHeader( apid=apid, - secondary_header_flag=bool(secondary_header_flag), + sec_header_flag=bool(secondary_header_flag), packet_type=PacketTypes.TC, data_length=data_length, - source_sequence_count=ssc, + ssc=ssc, ) self._app_data = app_data self._valid = True diff --git a/spacepackets/ecss/tm.py b/spacepackets/ecss/tm.py index 64777f1..0b21e75 100644 --- a/spacepackets/ecss/tm.py +++ b/spacepackets/ecss/tm.py @@ -72,10 +72,10 @@ def __init__( self.space_packet_header = SpacePacketHeader( apid=apid, packet_type=packet_type, - secondary_header_flag=secondary_header_flag, + sec_header_flag=secondary_header_flag, packet_version=packet_version, data_length=data_length, - source_sequence_count=ssc, + ssc=ssc, ) self.secondary_packet_header = PusTmSecondaryHeader( pus_version=pus_version, diff --git a/tests/test_ccsds.py b/tests/test_ccsds.py index e6b0fea..e3f81b3 100644 --- a/tests/test_ccsds.py +++ b/tests/test_ccsds.py @@ -20,8 +20,9 @@ def test_spacepacket(self): sp_header = SpacePacketHeader( apid=0x02, data_length=22, - source_sequence_count=52, + ssc=52, packet_type=PacketTypes.TC, + sequence_flags=SequenceFlags.FIRST_SEGMENT, ) self.assertEqual(sp_header.apid, 0x02) self.assertEqual(sp_header.ssc, 52) @@ -38,7 +39,7 @@ def test_spacepacket(self): SpacePacketHeader, apid=982292, data_length=22, - source_sequence_count=52, + ssc=52, packet_type=PacketTypes.TC, ) self.assertRaises( @@ -46,7 +47,7 @@ def test_spacepacket(self): SpacePacketHeader, apid=0x02, data_length=679393, - source_sequence_count=52, + ssc=52, packet_type=PacketTypes.TC, ) self.assertRaises( @@ -54,10 +55,15 @@ def test_spacepacket(self): SpacePacketHeader, apid=0x02, data_length=22, - source_sequence_count=96030, + ssc=96030, packet_type=PacketTypes.TC, ) self.assertRaises(ValueError, SpacePacketHeader.unpack, bytearray()) + self.assertEqual(sp_unpacked.packet_type, PacketTypes.TC) + self.assertEqual(sp_unpacked.apid, 0x02) + self.assertEqual(sp_unpacked.version, 0b000) + self.assertEqual(sp_unpacked.ssc, 52) + self.assertEqual(sp_unpacked.sequence_flags, SequenceFlags.FIRST_SEGMENT) print(sp_header) print(sp_header.__repr__()) @@ -95,15 +101,33 @@ def test_spacepacket(self): raw_header = get_space_packet_header( packet_id=packet_id, packet_sequence_control=psc, data_length=22 ) - self.assertEqual(raw_header[0], (packet_id & 0xFF00) >> 8) + self.assertEqual(raw_header[0], ((packet_id & 0xFF00) >> 8) & 0x1FFF) self.assertEqual(raw_header[1], packet_id & 0xFF) self.assertEqual(raw_header[2], (psc & 0xFF00) >> 8) self.assertEqual(raw_header[3], psc & 0xFF) self.assertEqual(raw_header[4], (22 & 0xFF00) >> 8) self.assertEqual(raw_header[5], 22 & 0xFF) + header_from_composite = SpacePacketHeader.from_composite_fields( + packet_id=packet_id, psc=psc, data_length=22 + ) + self.assertEqual(header_from_composite.pack(), raw_header) + header_tm = SpacePacketHeader( + packet_type=PacketTypes.TM, + sequence_flags=SequenceFlags.UNSEGMENTED, + apid=0x12, + data_length=7, + ssc=28, + ) + raw = header_tm.pack() + header_tm_back = SpacePacketHeader.unpack(raw) + self.assertEqual(header_tm_back.packet_type, PacketTypes.TM) + self.assertEqual(header_tm_back.apid, 0x12) + self.assertEqual(header_tm_back.version, 0b000) + self.assertEqual(header_tm_back.ssc, 28) + self.assertEqual(header_tm_back.data_length, 7) + def test_sp_parser(self): - raw_buffer = bytearray() tm_packet = PusTelemetry(service=17, subservice=2, pus_version=PusVersion.PUS_C) packet_ids = (tm_packet.packet_id,) tm_packet_raw = tm_packet.pack()