-
Notifications
You must be signed in to change notification settings - Fork 38
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
""" fprime_gds.plugins.framing.apid: APID mapping functions for F´ data """ | ||
Check failure on line 1 in src/fprime_gds/plugins/framing/apid.py GitHub Actions / Spell checking
|
||
from fprime_gds.common.utils.data_desc_type import DataDescType | ||
from fprime.common.models.serialize.numerical_types import U32Type | ||
|
||
import struct | ||
Check notice Code scanning / CodeQL Unused import Note
Import of 'struct' is not used.
|
||
|
||
class APID(object): | ||
""" APID implementations """ | ||
|
||
@classmethod | ||
def from_type(cls, data_type: DataDescType): | ||
""" Map from data description type to APID """ | ||
return data_type.value | ||
|
||
@classmethod | ||
def from_data(cls, data): | ||
""" Map from data bytes to APID """ | ||
u32_type = U32Type() | ||
u32_type.deserialize(data, offset=0) | ||
return cls.from_type(DataDescType(u32_type.val)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,216 @@ | ||
""" fprime_gds.plugins.framing.ccsds: implementation of framing plugin to support CCSDS | ||
Check failure on line 1 in src/fprime_gds/plugins/framing/ccsds.py GitHub Actions / Spell checking
|
||
This file registers a CCSDS plugin and a space-packet plugin used to frame data for use transmitting F´ data within a | ||
CCSDS frame. | ||
""" | ||
import struct | ||
from typing import List, Type | ||
|
||
from spacepackets.ccsds.spacepacket import SpacePacketHeader, PacketType, SpacePacket | ||
Check failure on line 9 in src/fprime_gds/plugins/framing/ccsds.py GitHub Actions / Spell checking
|
||
|
||
from fprime.common.models.serialize.numerical_types import U32Type | ||
Check notice Code scanning / CodeQL Unused import Note
Import of 'U32Type' is not used.
|
||
from fprime_gds.common.communication.framing import FramerDeframer, FpFramerDeframer | ||
from fprime_gds.plugin.definitions import gds_plugin_implementation | ||
|
||
from fprime_gds.plugins.framing.chain import ChainedFramerDeframer | ||
|
||
from fprime_gds.plugins.framing.apid import APID | ||
|
||
from crcmod.predefined import PredefinedCrc | ||
|
||
|
||
class SpacePacketFramerDeframer(FramerDeframer): | ||
""" Concrete implementation of FramerDeframer supporting CCSDS space packets | ||
This implementation is registered as a "framing" plugin to support CCSDS space packets within the GDS layer. | ||
""" | ||
SEQUENCE_NUMBER_MAXIMUM = 16384 | ||
|
||
def __init__(self): | ||
self.sequence_number = 0 | ||
|
||
def frame(self, data): | ||
""" Frame the supplied data in a SpacePacket frame | ||
Args: | ||
data: data to frame | ||
Return: | ||
space packet bytes | ||
""" | ||
space_header = SpacePacketHeader(packet_type=PacketType.TC, | ||
apid=APID.from_data(data), | ||
seq_count=self.get_sequence_number(), | ||
data_len=len(data)) | ||
space_packet = SpacePacket(space_header, sec_header=None, user_data=data) | ||
return space_packet.pack() | ||
|
||
def deframe(self, data, no_copy=False): | ||
""" No op deframe step """ | ||
return data, b"", b"" | ||
|
||
def get_sequence_number(self): | ||
""" Get the sequence number and increment | ||
This function will return the current sequence number and then increment the sequence number for the next round. | ||
Return: | ||
current sequence number | ||
""" | ||
sequence = self.sequence_number | ||
self.sequence_number = (self.sequence_number + 1) % self.SEQUENCE_NUMBER_MAXIMUM | ||
return sequence | ||
|
||
@classmethod | ||
def get_name(cls): | ||
""" Name of this implementation provided to CLI """ | ||
return "raw-space-packet" | ||
|
||
@classmethod | ||
@gds_plugin_implementation | ||
def register_framing_plugin(cls): | ||
""" Register the MyPlugin plugin """ | ||
return cls | ||
|
||
|
||
class SpaceDataLinkFramerDeframer(SpacePacketFramerDeframer): | ||
""" CCSDS space data link Framer/Deframer Implementation """ | ||
SEQUENCE_NUMBER_MAXIMUM = 256 | ||
HEADER_SIZE = 5 | ||
|
||
def __init__(self, scid, vcid): | ||
Check failure on line 80 in src/fprime_gds/plugins/framing/ccsds.py GitHub Actions / Spell checking
|
||
""" """ | ||
self.scid = scid | ||
self.vcid = vcid | ||
self.sequence_number = 0 | ||
# Note, fprime is used for downlink at this time | ||
self.fprime_framer_deframer = FpFramerDeframer("crc32") | ||
|
||
def frame(self, data): | ||
""" Frame the supplied data in an CCSDS space data link packet frame | ||
Args: | ||
data: data to frame | ||
Return: | ||
space data link packet bytes | ||
""" | ||
space_packet_bytes = data | ||
length = len(space_packet_bytes) | ||
assert length < (pow(2, 10) - 1), "Length too-large for CCSDS format" | ||
|
||
# CCSDS TC Header: | ||
# 2b - 00 - TF version number | ||
# 1b - 0/1 - 0 enable FARM checks, 1 bypass FARM | ||
# 1b - 0/1 - 0 Type-D data, 1 Type-C data | ||
# 2b - 00 - Reserved | ||
# 10b - XX - Spacecraft id | ||
# 6b - XX - Virtual Channel ID | ||
# 10b - XX - Frame length | ||
|
||
# 8b - XX - Frame sequence number | ||
|
||
header = (0 << 30) | \ | ||
(0 << 29) | \ | ||
(0 << 28) | \ | ||
((self.scid & 0x3FF) << 16) | \ | ||
((self.vcid & 0x3F) << 10) | \ | ||
(length & 0x3FF) | ||
|
||
header_bytes = struct.pack(">IB", header, self.sequence_number) | ||
assert len(header_bytes) == self.HEADER_SIZE, "CCSDS primary header must be 5 octets long" | ||
full_bytes_no_crc = header_bytes + space_packet_bytes | ||
assert len(full_bytes_no_crc) == self.HEADER_SIZE + length, "Malformed packet generated" | ||
|
||
# Use CRC-16 (CCITT) with no final XOR (XOR of 0x0000) | ||
crc_calculator = PredefinedCrc(crc_name="crc-ccitt-false") | ||
crc_calculator.update(full_bytes_no_crc) | ||
|
||
full_bytes = full_bytes_no_crc + struct.pack(">H", crc_calculator.crcValue) | ||
return full_bytes | ||
|
||
def get_sequence_number(self): | ||
""" Get the sequence number and increment | ||
This function will return the current sequence number and then increment the sequence number for the next round. | ||
Return: | ||
current sequence number | ||
""" | ||
sequence = self.sequence_number | ||
self.sequence_number = (self.sequence_number + 1) % self.SEQUENCE_NUMBER_MAXIMUM | ||
return sequence | ||
|
||
def deframe(self, data, no_copy=False): | ||
""" Deframe using fprime for now """ | ||
return self.fprime_framer_deframer.deframe(data, no_copy) | ||
|
||
@classmethod | ||
def get_arguments(cls): | ||
""" Arguments to request from the CLI """ | ||
return { | ||
("--scid", ): { | ||
"type": lambda input_arg: int(input_arg, 0), | ||
"help": "Spacecraft ID" | ||
}, | ||
("--vcid",): { | ||
"type": lambda input_arg: int(input_arg, 0), | ||
"help": "Virtual channel ID" | ||
} | ||
} | ||
|
||
@classmethod | ||
def check_arguments(cls, scid, vcid): | ||
""" Check arguments from the CLI | ||
Confirms that the input arguments are valid for this framer/deframer. | ||
Args: | ||
scid: spacecraft id | ||
vcid: virtual channel id | ||
""" | ||
if scid is None: | ||
raise TypeError(f"Spacecraft ID not specified") | ||
if scid < 0: | ||
raise TypeError(f"Spacecraft ID {scid} is negative") | ||
if scid > 0x3FF: | ||
raise TypeError(f"Spacecraft ID {scid} is larger than {0x3FF}") | ||
|
||
if vcid is None: | ||
raise TypeError(f"Virtual Channel ID not specified") | ||
if vcid < 0: | ||
raise TypeError(f"Virtual Channel ID {vcid} is negative") | ||
if vcid > 0x3F: | ||
raise TypeError(f"Virtual Channel ID {vcid} is larger than {0x3FF}") | ||
|
||
@classmethod | ||
def get_name(cls): | ||
""" Name of this implementation provided to CLI """ | ||
return "unspecified-space-data-link" | ||
|
||
@classmethod | ||
@gds_plugin_implementation | ||
def register_framing_plugin(cls): | ||
""" Register the MyPlugin plugin """ | ||
return cls | ||
|
||
|
||
class SpacePacketSpaceDataLinkFramerDeframer(ChainedFramerDeframer): | ||
""" Space Data Link Protocol framing and deframing that has a data unit of Space Packets """ | ||
|
||
@classmethod | ||
def get_composites(cls) -> List[Type[FramerDeframer]]: | ||
""" Return the composite list of this """ | ||
return [ | ||
SpacePacketFramerDeframer, | ||
SpaceDataLinkFramerDeframer | ||
] | ||
|
||
@classmethod | ||
def get_name(cls): | ||
""" Name of this implementation provided to CLI """ | ||
return "space-packet-space-data-link" | ||
|
||
@classmethod | ||
@gds_plugin_implementation | ||
def register_framing_plugin(cls): | ||
""" Register the MyPlugin plugin """ | ||
return cls |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
""" fprime_gds.plugins.framing.chain: implementation of a chained framer/deframer """ | ||
from abc import ABC, abstractmethod | ||
from functools import reduce | ||
from typing import Any, Dict, List, Type | ||
from fprime_gds.common.communication.framing import FramerDeframer | ||
|
||
|
||
class ChainedFramerDeframer(FramerDeframer, ABC): | ||
""" Framer/deframer that is a composite of chained framer/deframers | ||
This Framer/Deframer will wrap a set of framer/deframers where the result of the frame and deframe options will pass | ||
from one to the other subsequently. The order is specified via the framing path and deframing will use the reverse | ||
order from specified. | ||
""" | ||
def __init__(self, **kwargs): | ||
""" Initialize the chained framer/deframer from a framing-ordered set of children """ | ||
frame_order_framer_deframers = [ | ||
composite(**self.get_argument_subset(composite, kwargs)) | ||
for composite in self.get_composites() | ||
] | ||
self.framers = frame_order_framer_deframers[::1] | ||
self.deframers = frame_order_framer_deframers[::-1] | ||
|
||
@classmethod | ||
@abstractmethod | ||
def get_composites(cls) -> List[Type[FramerDeframer]]: | ||
""" Return a list of composites """ | ||
raise NotImplementedError(f"Subclasses of {cls.__name__} must implement get_composites") | ||
|
||
@staticmethod | ||
def get_argument_subset(composite: Type[FramerDeframer], argument_dictionary: Dict[str, Any]) -> Dict[str, Any]: | ||
""" Get an argument subset that is needed by composite | ||
For the composite, find the set of arguments that is needed by this composite and pull those out of the complete | ||
argument dictionary. | ||
Args: | ||
composite: class of a subtype of FramerDeframer | ||
argument_dictionary: dictionary of all input arguments | ||
""" | ||
if not hasattr(composite, "get_arguments"): | ||
return {} | ||
needed_arguments = composite.get_arguments() | ||
needed_argument_destinations = [ | ||
description["destination"] if "destination" in description else | ||
[dash_dash for dash_dash in flag if dash_dash.startswith("--")][0].lstrip("-").replace("-", "_") | ||
for flag, description in needed_arguments.items() | ||
] | ||
return {name: argument_dictionary[name] for name in needed_argument_destinations} | ||
|
||
@classmethod | ||
def get_arguments(cls): | ||
""" Arguments to request from the CLI """ | ||
all_arguments = {} | ||
for composite in cls.get_composites(): | ||
all_arguments.update(composite.get_arguments() if hasattr(composite, "get_arguments") else {}) | ||
return all_arguments | ||
|
||
@classmethod | ||
def check_arguments(cls, **kwargs): | ||
""" Check arguments from the CLI """ | ||
for composite in cls.get_composites(): | ||
subset_arguments = cls.get_argument_subset(composite, kwargs) | ||
if hasattr(composite, "check_arguments"): | ||
composite.check_arguments(**subset_arguments) | ||
|
||
def deframe(self, data, no_copy=False): | ||
""" Deframe via a chain of children deframers """ | ||
packet = data[:] if not no_copy else data | ||
remaining = None | ||
discarded = b"" | ||
|
||
for deframer in self.deframers: | ||
new_packet, new_remaining, new_discarded = deframer.deframe(packet, True) | ||
discarded += new_discarded | ||
remaining = new_remaining if remaining is None else remaining | ||
packet = new_packet | ||
return packet, remaining, discarded | ||
|
||
def frame(self, data): | ||
""" Frame via a chain of children framers """ | ||
return reduce(lambda framed_data, framer: framer.frame(framed_data), self.framers, data) |