Skip to content

Commit

Permalink
Merge pull request #84 from us-irs/update-spacepackets-api-and-docs
Browse files Browse the repository at this point in the history
update CCSDS Spacepackets API and docs
  • Loading branch information
robamu authored Oct 15, 2024
2 parents c5e0017 + 031881e commit 5220c06
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 45 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

Renamed `PusFileSeqCountProvider` to `CcsdsFileSeqCountProvider` but keep old alias.

## Added

- New `SpacePacketHeader.tc` and `SpacePacketHeader.tm` constructors which set the packet
type correctly

# [v0.24.2] 2024-10-15

## Fixed
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ coverage run -m pytest

# Documentation

The documentation is built with Sphinx
The documentation is built with Sphinx and new documentation should be written using the
[NumPy format](https://www.sphinx-doc.org/en/master/usage/extensions/example_numpy.html#example-numpy).

Install the required dependencies first:

Expand Down
68 changes: 40 additions & 28 deletions docs/examples.rst
Original file line number Diff line number Diff line change
@@ -1,52 +1,38 @@
Examples
=========

ECSS PUS packets
-----------------

The following example shows how to generate PUS packets using the PUS ping telecommand and a
PUS ping telemetry reply without a timestamp.

.. testcode:: pus

from spacepackets.ecss.tc import PusTc
from spacepackets.ecss.tm import PusTm

ping_cmd = PusTc(service=17, subservice=1, apid=0x01)
cmd_as_bytes = ping_cmd.pack()
print(f"Ping telecommand [17,1] (hex): [{cmd_as_bytes.hex(sep=',')}]")

ping_reply = PusTm(service=17, subservice=2, apid=0x01, timestamp=bytes())
tm_as_bytes = ping_reply.pack()
print(f"Ping reply [17,2] (hex): [{tm_as_bytes.hex(sep=',')}]")

Output:

.. testoutput:: pus

Ping telecommand [17,1] (hex): [18,01,c0,00,00,06,2f,11,01,00,00,16,1d]
Ping reply [17,2] (hex): [08,01,c0,00,00,08,20,11,02,00,00,00,00,86,d7]

CCSDS Space Packet
-------------------

The following example shows how to generate a space packet header:
The following example shows how to generate a space packet header or a custom telecommand
based on CCSDS space packets.

.. testcode:: ccsds

from spacepackets.ccsds.spacepacket import SpHeader, PacketType
from spacepackets.ccsds.spacepacket import SpHeader, PacketType, CCSDS_HEADER_LEN

spacepacket_header = SpHeader(
packet_type=PacketType.TC, apid=0x01, seq_count=0, data_len=0
)
header_as_bytes = spacepacket_header.pack()
print(f"Space packet header (hex): [{header_as_bytes.hex(sep=',')}]")

# Create CCSDS space packet telecommand with custom data.
custom_data = bytes([1, 2, 3, 4])
tc_header = SpHeader.tc(apid=2, seq_count=5, data_len=0)
tc_header.set_data_len_from_packet_len(CCSDS_HEADER_LEN + len(custom_data))
telecommand = tc_header.pack()
telecommand.extend(custom_data)
print(f"Space packet telecommand (hex): [{telecommand.hex(sep=',')}]")



Output:

.. testoutput:: ccsds

Space packet header (hex): [10,01,c0,00,00,00]
Space packet telecommand (hex): [10,02,c0,05,00,03,01,02,03,04]

CFDP Packets
-----------------
Expand Down Expand Up @@ -119,6 +105,32 @@ Output
--- PDU 2 RAW ---
0x[24,00,0a,00,01,00,02,04,00,1c,29,1c,a3,00,00,00,0c]

ECSS PUS packets
-----------------

The following example shows how to generate PUS packets using the PUS ping telecommand and a
PUS ping telemetry reply without a timestamp.

.. testcode:: pus

from spacepackets.ecss.tc import PusTc
from spacepackets.ecss.tm import PusTm

ping_cmd = PusTc(service=17, subservice=1, apid=0x01)
cmd_as_bytes = ping_cmd.pack()
print(f"Ping telecommand [17,1] (hex): [{cmd_as_bytes.hex(sep=',')}]")

ping_reply = PusTm(service=17, subservice=2, apid=0x01, timestamp=bytes())
tm_as_bytes = ping_reply.pack()
print(f"Ping reply [17,2] (hex): [{tm_as_bytes.hex(sep=',')}]")

Output:

.. testoutput:: pus

Ping telecommand [17,1] (hex): [18,01,c0,00,00,06,2f,11,01,00,00,16,1d]
Ping reply [17,2] (hex): [08,01,c0,00,00,08,20,11,02,00,00,00,00,86,d7]

USLP Frames
-------------------

Expand Down
116 changes: 100 additions & 16 deletions spacepackets/ccsds/spacepacket.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ def pack(self) -> bytearray:


class SpacePacketHeader(AbstractSpacePacket):
"""This class encapsulates the space packet header.
Packet reference: Blue Book CCSDS 133.0-B-2"""
"""This class encapsulates the space packet header. Packet reference: Blue Book CCSDS 133.0-B-2"""

def __init__(
self,
Expand All @@ -187,6 +186,10 @@ def __init__(
):
"""Create a space packet header with the given field parameters.
The data length field can also be set from the total packet length by using the
:py:meth:`set_data_len_from_packet_len` method after construction of the space packet
header object.
>>> sph = SpacePacketHeader(packet_type=PacketType.TC, apid=0x42, seq_count=0, data_len=12)
>>> hex(sph.apid)
'0x42'
Expand All @@ -201,17 +204,28 @@ def __init__(
>>> sph.packet_seq_control
PacketSeqCtrl(seq_flags=<SequenceFlags.UNSEGMENTED: 3>, seq_count=0)
:param packet_type: 0 for Telemetery, 1 for Telecommands
:param apid: Application Process ID, should not be larger
than 11 bits, deciaml 2074 or hex 0x7ff
:param seq_count: Source sequence counter, should not be larger than 0x3fff or
decimal 16383
:param data_len: Contains a length count C that equals one fewer than the length of the
packet data field. Should not be larger than 65535 bytes
:param ccsds_version:
:param sec_header_flag: Secondary header flag, or False by default.
:param seq_flags:
:raises ValueError: On invalid parameters
Parameters
-----------
packet_type: PacketType
0 for Telemetery, 1 for Telecommands
apid: int
Application Process ID, should not be larger than 11 bits, deciaml 2074 or hex 0x7ff
seq_count: int
Source sequence counter, should not be larger than 0x3fff or decimal 16383
data_len: int
Contains a length count C that equals one fewer than the length of the packet data
field. Should not be larger than 65535 bytes
sec_header_flag: bool
Secondary header flag, or False by default.
seq_flags:
Sequence flags, defaults to unsegmented.
ccsds_version: int
Version of the CCSDS packet. Defaults to 0b000
Raises
--------
ValueError
On invalid parameters
"""
if data_len > pow(2, 16) - 1 or data_len < 0:
raise ValueError(
Expand All @@ -225,6 +239,54 @@ def __init__(
self._psc = PacketSeqCtrl(seq_flags=seq_flags, seq_count=seq_count)
self.data_len = data_len

@classmethod
def tc(
cls,
apid: int,
seq_count: int,
data_len: int,
sec_header_flag: bool = False,
seq_flags: SequenceFlags = SequenceFlags.UNSEGMENTED,
ccsds_version: int = 0b000,
) -> SpacePacketHeader:
"""Create a space packet header with the given field parameters for a telecommand packet.
Calls the default constructor :py:meth:`SpacePacketHeader` with the packet type
set to :py:class:`PacketType.TC`.
"""
return cls(
packet_type=PacketType.TC,
apid=apid,
seq_count=seq_count,
data_len=data_len,
sec_header_flag=sec_header_flag,
seq_flags=seq_flags,
ccsds_version=ccsds_version,
)

@classmethod
def tm(
cls,
apid: int,
seq_count: int,
data_len: int,
sec_header_flag: bool = False,
seq_flags: SequenceFlags = SequenceFlags.UNSEGMENTED,
ccsds_version: int = 0b000,
) -> SpacePacketHeader:
"""Create a space packet header with the given field parameters for a telemetry packet.
Calls the default constructor :py:meth:`SpacePacketHeader` with the packet type
set to :py:class:`PacketType.TM`.
"""
return cls(
packet_type=PacketType.TM,
apid=apid,
seq_count=seq_count,
data_len=data_len,
sec_header_flag=sec_header_flag,
seq_flags=seq_flags,
ccsds_version=ccsds_version,
)

@classmethod
def from_composite_fields(
cls,
Expand Down Expand Up @@ -289,6 +351,19 @@ def sec_header_flag(self, value):
def seq_count(self):
return self._psc.seq_count

def set_data_len_from_packet_len(self, packet_len: int):
"""Sets the data length field from the given total packet length. The total packet length
must be at least 7 bytes.
Raises
-------
ValueError
The passed packet length is smaller than the minimum expected 7 bytes.
"""
if packet_len < CCSDS_HEADER_LEN + 1:
raise ValueError("specified total packet length too short")
self.data_len = packet_len - CCSDS_HEADER_LEN - 1

@seq_count.setter
def seq_count(self, seq_cnt):
self._psc.seq_count = seq_cnt
Expand All @@ -313,7 +388,12 @@ def apid(self, apid):
def packet_len(self) -> int:
"""Retrieve the full space packet size when packed.
:return: Size of the TM packet based on the space packet header data length field.
The full packet size is the data length field plus the :py:const:`CCSDS_HEADER_LEN` of 6
bytes plus one.
Returns
--------
Size of the TM packet based on the space packet header data length field.
"""
return CCSDS_HEADER_LEN + self.data_len + 1

Expand Down Expand Up @@ -383,8 +463,12 @@ def __repr__(self):
)

def pack(self) -> bytearray:
"""Pack the raw byte representation of the space packet
:raises ValueError: Mandatory fields were not supplied properly"""
"""Pack the raw byte representation of the space packet.
Raises
--------
ValueError
Mandatory fields were not supplied properly"""
packet = self.sp_header.pack()
if self.sp_header.sec_header_flag:
if self.sec_header is None:
Expand Down
38 changes: 38 additions & 0 deletions tests/ccsds/test_space_packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,44 @@ def test_basic(self):
self.assertEqual(self.sp_header.data_len, 0x16)
self.assertEqual(self.sp_header.packet_type, PacketType.TC)

def test_tm_header(self):
sp_header = SpacePacketHeader.tm(apid=0x03, data_len=16, seq_count=35)
self.assertEqual(sp_header.apid, 0x03)
self.assertEqual(sp_header.seq_flags, SequenceFlags.UNSEGMENTED)
self.assertEqual(sp_header.ccsds_version, 0b000)
self.assertEqual(sp_header.packet_id, PacketId(PacketType.TM, False, 0x03))
self.assertEqual(
sp_header.packet_seq_control,
PacketSeqCtrl(SequenceFlags.UNSEGMENTED, 35),
)
self.assertEqual(sp_header.seq_count, 35)
self.assertEqual(sp_header.data_len, 16)
self.assertEqual(sp_header.packet_type, PacketType.TM)

def test_len_field_setter(self):
self.sp_header.set_data_len_from_packet_len(10)
# Total packet lenght minus the header lenght minus 1
self.assertEqual(self.sp_header.data_len, 3)

def test_invalid_len_field_setter_call(self):
for idx in range(7):
with self.assertRaises(ValueError):
self.sp_header.set_data_len_from_packet_len(idx)

def test_tc_header(self):
sp_header = SpacePacketHeader.tc(apid=0x7FF, data_len=16, seq_count=0x3FFF)
self.assertEqual(sp_header.apid, 0x7FF)
self.assertEqual(sp_header.seq_flags, SequenceFlags.UNSEGMENTED)
self.assertEqual(sp_header.ccsds_version, 0b000)
self.assertEqual(sp_header.packet_id, PacketId(PacketType.TC, False, 0x7FF))
self.assertEqual(
sp_header.packet_seq_control,
PacketSeqCtrl(SequenceFlags.UNSEGMENTED, 0x3FFF),
)
self.assertEqual(sp_header.seq_count, 0x3FFF)
self.assertEqual(sp_header.data_len, 16)
self.assertEqual(sp_header.packet_type, PacketType.TC)

def test_raw_output(self):
raw_output = self.sp_header.pack()
self.assertEqual(
Expand Down

0 comments on commit 5220c06

Please sign in to comment.