diff --git a/pyproject.toml b/pyproject.toml index be454e93..4ed495c7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,8 @@ classifiers = [ "License :: OSI Approved :: Apache Software License", ] dependencies = [ + "spacepackets>=0.23.0", + "pluggy>=1.3.0", "flask>=3.0.0", "flask_compress>=1.11", "pyzmq>=24.0.1", @@ -62,6 +64,12 @@ fprime-seqgen = "fprime_gds.common.tools.seqgen:main" fprime-dp-write = "fprime_gds.executables.data_product_writer:main" fprime-gds = "fprime_gds.executables.run_deployment:main" +# Automatically provided plugins +[project.entry-points.fprime_gds] +space_packet = "fprime_gds.plugins.framing.ccsds:SpacePacketFramerDeframer" +space_data_link = "fprime_gds.plugins.framing.ccsds:SpaceDataLinkFramerDeframer" +space_packet_space_data_link = "fprime_gds.plugins.framing.ccsds:SpacePacketSpaceDataLinkFramerDeframer" + # For Pytest fixtures [project.entry-points."pytest11"] fprime_test_api = "fprime_gds.common.testing_fw.pytest_integration" diff --git a/src/fprime_gds/plugins/framing/__init__.py b/src/fprime_gds/plugins/framing/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/fprime_gds/plugins/framing/apid.py b/src/fprime_gds/plugins/framing/apid.py new file mode 100644 index 00000000..4806565a --- /dev/null +++ b/src/fprime_gds/plugins/framing/apid.py @@ -0,0 +1,20 @@ +""" fprime_gds.plugins.framing.apid: APID mapping functions for F´ data """ +from fprime_gds.common.utils.data_desc_type import DataDescType +from fprime.common.models.serialize.numerical_types import U32Type + +import struct + +class APID(object): + """ APID implementations """ + + @classmethod + def from_type(cls, data_type: DataDescType): + """ Map from data description type to APID """ + return data_type.value + + @classmethod + def from_data(cls, data): + """ Map from data bytes to APID """ + u32_type = U32Type() + u32_type.deserialize(data, offset=0) + return cls.from_type(DataDescType(u32_type.val)) diff --git a/src/fprime_gds/plugins/framing/ccsds.py b/src/fprime_gds/plugins/framing/ccsds.py new file mode 100644 index 00000000..69508247 --- /dev/null +++ b/src/fprime_gds/plugins/framing/ccsds.py @@ -0,0 +1,216 @@ +""" fprime_gds.plugins.framing.ccsds: implementation of framing plugin to support CCSDS + +This file registers a CCSDS plugin and a space-packet plugin used to frame data for use transmitting F´ data within a +CCSDS frame. +""" +import struct +from typing import List, Type + +from spacepackets.ccsds.spacepacket import SpacePacketHeader, PacketType, SpacePacket + +from fprime.common.models.serialize.numerical_types import U32Type +from fprime_gds.common.communication.framing import FramerDeframer, FpFramerDeframer +from fprime_gds.plugin.definitions import gds_plugin_implementation + +from fprime_gds.plugins.framing.chain import ChainedFramerDeframer + +from fprime_gds.plugins.framing.apid import APID + +from crcmod.predefined import PredefinedCrc + + +class SpacePacketFramerDeframer(FramerDeframer): + """ Concrete implementation of FramerDeframer supporting CCSDS space packets + + This implementation is registered as a "framing" plugin to support CCSDS space packets within the GDS layer. + """ + SEQUENCE_NUMBER_MAXIMUM = 16384 + + def __init__(self): + self.sequence_number = 0 + + def frame(self, data): + """ Frame the supplied data in a SpacePacket frame + + Args: + data: data to frame + Return: + space packet bytes + """ + space_header = SpacePacketHeader(packet_type=PacketType.TC, + apid=APID.from_data(data), + seq_count=self.get_sequence_number(), + data_len=len(data)) + space_packet = SpacePacket(space_header, sec_header=None, user_data=data) + return space_packet.pack() + + def deframe(self, data, no_copy=False): + """ No op deframe step """ + return data, b"", b"" + + def get_sequence_number(self): + """ Get the sequence number and increment + + This function will return the current sequence number and then increment the sequence number for the next round. + + Return: + current sequence number + """ + sequence = self.sequence_number + self.sequence_number = (self.sequence_number + 1) % self.SEQUENCE_NUMBER_MAXIMUM + return sequence + + @classmethod + def get_name(cls): + """ Name of this implementation provided to CLI """ + return "raw-space-packet" + + @classmethod + @gds_plugin_implementation + def register_framing_plugin(cls): + """ Register the MyPlugin plugin """ + return cls + + +class SpaceDataLinkFramerDeframer(SpacePacketFramerDeframer): + """ CCSDS space data link Framer/Deframer Implementation """ + SEQUENCE_NUMBER_MAXIMUM = 256 + HEADER_SIZE = 5 + + def __init__(self, scid, vcid): + """ """ + self.scid = scid + self.vcid = vcid + self.sequence_number = 0 + # Note, fprime is used for downlink at this time + self.fprime_framer_deframer = FpFramerDeframer("crc32") + + def frame(self, data): + """ Frame the supplied data in an CCSDS space data link packet frame + + Args: + data: data to frame + Return: + space data link packet bytes + """ + space_packet_bytes = data + length = len(space_packet_bytes) + assert length < (pow(2, 10) - 1), "Length too-large for CCSDS format" + + # CCSDS TC Header: + # 2b - 00 - TF version number + # 1b - 0/1 - 0 enable FARM checks, 1 bypass FARM + # 1b - 0/1 - 0 Type-D data, 1 Type-C data + # 2b - 00 - Reserved + # 10b - XX - Spacecraft id + # 6b - XX - Virtual Channel ID + # 10b - XX - Frame length + + # 8b - XX - Frame sequence number + + header = (0 << 30) | \ + (0 << 29) | \ + (0 << 28) | \ + ((self.scid & 0x3FF) << 16) | \ + ((self.vcid & 0x3F) << 10) | \ + (length & 0x3FF) + + header_bytes = struct.pack(">IB", header, self.sequence_number) + assert len(header_bytes) == self.HEADER_SIZE, "CCSDS primary header must be 5 octets long" + full_bytes_no_crc = header_bytes + space_packet_bytes + assert len(full_bytes_no_crc) == self.HEADER_SIZE + length, "Malformed packet generated" + + # Use CRC-16 (CCITT) with no final XOR (XOR of 0x0000) + crc_calculator = PredefinedCrc(crc_name="crc-ccitt-false") + crc_calculator.update(full_bytes_no_crc) + + full_bytes = full_bytes_no_crc + struct.pack(">H", crc_calculator.crcValue) + return full_bytes + + def get_sequence_number(self): + """ Get the sequence number and increment + + This function will return the current sequence number and then increment the sequence number for the next round. + + Return: + current sequence number + """ + sequence = self.sequence_number + self.sequence_number = (self.sequence_number + 1) % self.SEQUENCE_NUMBER_MAXIMUM + return sequence + + def deframe(self, data, no_copy=False): + """ Deframe using fprime for now """ + return self.fprime_framer_deframer.deframe(data, no_copy) + + @classmethod + def get_arguments(cls): + """ Arguments to request from the CLI """ + return { + ("--scid", ): { + "type": lambda input_arg: int(input_arg, 0), + "help": "Spacecraft ID" + }, + ("--vcid",): { + "type": lambda input_arg: int(input_arg, 0), + "help": "Virtual channel ID" + } + } + + @classmethod + def check_arguments(cls, scid, vcid): + """ Check arguments from the CLI + + Confirms that the input arguments are valid for this framer/deframer. + + Args: + scid: spacecraft id + vcid: virtual channel id + """ + if scid is None: + raise TypeError(f"Spacecraft ID not specified") + if scid < 0: + raise TypeError(f"Spacecraft ID {scid} is negative") + if scid > 0x3FF: + raise TypeError(f"Spacecraft ID {scid} is larger than {0x3FF}") + + if vcid is None: + raise TypeError(f"Virtual Channel ID not specified") + if vcid < 0: + raise TypeError(f"Virtual Channel ID {vcid} is negative") + if vcid > 0x3F: + raise TypeError(f"Virtual Channel ID {vcid} is larger than {0x3FF}") + + @classmethod + def get_name(cls): + """ Name of this implementation provided to CLI """ + return "unspecified-space-data-link" + + @classmethod + @gds_plugin_implementation + def register_framing_plugin(cls): + """ Register the MyPlugin plugin """ + return cls + + +class SpacePacketSpaceDataLinkFramerDeframer(ChainedFramerDeframer): + """ Space Data Link Protocol framing and deframing that has a data unit of Space Packets """ + + @classmethod + def get_composites(cls) -> List[Type[FramerDeframer]]: + """ Return the composite list of this """ + return [ + SpacePacketFramerDeframer, + SpaceDataLinkFramerDeframer + ] + + @classmethod + def get_name(cls): + """ Name of this implementation provided to CLI """ + return "space-packet-space-data-link" + + @classmethod + @gds_plugin_implementation + def register_framing_plugin(cls): + """ Register the MyPlugin plugin """ + return cls diff --git a/src/fprime_gds/plugins/framing/chain.py b/src/fprime_gds/plugins/framing/chain.py new file mode 100644 index 00000000..6ebe7286 --- /dev/null +++ b/src/fprime_gds/plugins/framing/chain.py @@ -0,0 +1,82 @@ +""" fprime_gds.plugins.framing.chain: implementation of a chained framer/deframer """ +from abc import ABC, abstractmethod +from functools import reduce +from typing import Any, Dict, List, Type +from fprime_gds.common.communication.framing import FramerDeframer + + +class ChainedFramerDeframer(FramerDeframer, ABC): + """ Framer/deframer that is a composite of chained framer/deframers + + This Framer/Deframer will wrap a set of framer/deframers where the result of the frame and deframe options will pass + from one to the other subsequently. The order is specified via the framing path and deframing will use the reverse + order from specified. + """ + def __init__(self, **kwargs): + """ Initialize the chained framer/deframer from a framing-ordered set of children """ + frame_order_framer_deframers = [ + composite(**self.get_argument_subset(composite, kwargs)) + for composite in self.get_composites() + ] + self.framers = frame_order_framer_deframers[::1] + self.deframers = frame_order_framer_deframers[::-1] + + @classmethod + @abstractmethod + def get_composites(cls) -> List[Type[FramerDeframer]]: + """ Return a list of composites """ + raise NotImplementedError(f"Subclasses of {cls.__name__} must implement get_composites") + + @staticmethod + def get_argument_subset(composite: Type[FramerDeframer], argument_dictionary: Dict[str, Any]) -> Dict[str, Any]: + """ Get an argument subset that is needed by composite + + For the composite, find the set of arguments that is needed by this composite and pull those out of the complete + argument dictionary. + + Args: + composite: class of a subtype of FramerDeframer + argument_dictionary: dictionary of all input arguments + """ + if not hasattr(composite, "get_arguments"): + return {} + needed_arguments = composite.get_arguments() + needed_argument_destinations = [ + description["destination"] if "destination" in description else + [dash_dash for dash_dash in flag if dash_dash.startswith("--")][0].lstrip("-").replace("-", "_") + for flag, description in needed_arguments.items() + ] + return {name: argument_dictionary[name] for name in needed_argument_destinations} + + @classmethod + def get_arguments(cls): + """ Arguments to request from the CLI """ + all_arguments = {} + for composite in cls.get_composites(): + all_arguments.update(composite.get_arguments() if hasattr(composite, "get_arguments") else {}) + return all_arguments + + @classmethod + def check_arguments(cls, **kwargs): + """ Check arguments from the CLI """ + for composite in cls.get_composites(): + subset_arguments = cls.get_argument_subset(composite, kwargs) + if hasattr(composite, "check_arguments"): + composite.check_arguments(**subset_arguments) + + def deframe(self, data, no_copy=False): + """ Deframe via a chain of children deframers """ + packet = data[:] if not no_copy else data + remaining = None + discarded = b"" + + for deframer in self.deframers: + new_packet, new_remaining, new_discarded = deframer.deframe(packet, True) + discarded += new_discarded + remaining = new_remaining if remaining is None else remaining + packet = new_packet + return packet, remaining, discarded + + def frame(self, data): + """ Frame via a chain of children framers """ + return reduce(lambda framed_data, framer: framer.frame(framed_data), self.framers, data)