Skip to content

Commit

Permalink
Merge branch 'devel' into fix/enum_representation_types
Browse files Browse the repository at this point in the history
  • Loading branch information
Jack White committed Dec 11, 2023
2 parents 8acb91f + 8677c64 commit 608f4f7
Show file tree
Hide file tree
Showing 18 changed files with 219 additions and 98 deletions.
8 changes: 8 additions & 0 deletions .github/ISSUE_TEMPLATE/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
blank_issues_enabled: false
contact_links:
- name: Open an Issue with F´
url: https://github.com/nasa/fprime/issues/new/choose
about: Issues about this repository are centrally managed in the core F´ repository
- name: F´ Community Discussions
url: https://github.com/nasa/fprime/discussions
about: Questions should be asked in the F´ Discussions
11 changes: 0 additions & 11 deletions .github/ISSUE_TEMPLATE/do-not-use.md

This file was deleted.

1 change: 1 addition & 0 deletions .github/actions/spelling/expect.txt
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ Smath
SNDHWM
socketserver
sorttable
spam
sphinxcontrib
splitext
squashify
Expand Down
31 changes: 18 additions & 13 deletions src/fprime_gds/common/communication/framing.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ def deframe(self, data, no_copy=False):
"""
Deframes the incoming data from the specified format. Produces exactly one packet, and leftover bytes. Users
wanting all packets to be deframed should call "deframe_all". If no full packet is available, this method
returns None. Expects incoming raw bytes to deframe, and returns a deframed packet or None, and the leftover
bytes that were unused. Will search and discard data up until a start token is found. Note: data will be
consumed up to the first start token found.
returns None. Expects incoming raw bytes to deframe, and returns a deframed packet or None, the leftover
bytes that were unused, and any bytes discarded from the existing data stream. Will search and discard data up
until a start token is found. Note: data will be consumed up to the first start token found.
:param data: framed data bytes
:param no_copy: (optional) will prevent extra copy if True, but "data" input will be destroyed.
:return: (packet as array of bytes or None, leftover bytes)
:return: (packet as array of bytes or None, leftover bytes, any discarded data)
"""

def deframe_all(self, data, no_copy):
Expand All @@ -56,16 +56,18 @@ def deframe_all(self, data, no_copy):
:param data: framed data bytes
:param no_copy: (optional) will prevent extra copy if True, but "data" input will be destroyed.
:return:
:return: list of packets, remaining data, discarded/unframed/garbage data
"""
packets = []
if not no_copy:
data = copy.copy(data)
discarded_aggregate = b""
while True:
# Deframe and return only on None
(packet, data) = self.deframe(data, no_copy=True)
(packet, data, discarded) = self.deframe(data, no_copy=True)
discarded_aggregate += discarded
if packet is None:
return packets, data
return packets, data, discarded_aggregate
packets.append(packet)


Expand Down Expand Up @@ -147,6 +149,7 @@ def deframe(self, data, no_copy=False):
:param no_copy: (optional) will prevent extra copy if True, but "data" input will be destroyed.
:return: (packet as array of bytes or None, leftover bytes)
"""
discarded = b""
if not no_copy:
data = copy.copy(data)
# Continue until there is not enough data for the header, or until a packet is found (return)
Expand All @@ -163,6 +166,7 @@ def deframe(self, data, no_copy=False):
start != FpFramerDeframer.START_TOKEN
or data_size >= FpFramerDeframer.MAXIMUM_DATA_SIZE
):
discarded += data[0:1]
data = data[1:]
continue
# If the pool is large enough to read the whole frame, then read it
Expand All @@ -175,17 +179,18 @@ def deframe(self, data, no_copy=False):
data[: data_size + FpFramerDeframer.HEADER_SIZE]
):
data = data[total_size:]
return deframed, data
return deframed, data, discarded
print(
"[WARNING] Checksum validation failed. Have you correctly set '--comm-checksum-type'",
file=sys.stderr,
)
# Bad checksum, rotate 1 and keep looking for non-garbage
discarded += data[0:1]
data = data[1:]
continue
# Case of not enough data for a full packet, return hoping for more later
return None, data
return None, data
return None, data, discarded
return None, data, discarded


class TcpServerFramerDeframer(FramerDeframer):
Expand Down Expand Up @@ -237,11 +242,11 @@ def deframe(self, data, no_copy=False):
data = data[1:]
# Break out of data when not enough
if len(data) < 8:
return None, data
return None, data, b""
# Read the length and break if not enough data
(data_len,) = struct.unpack_from(">I", data, 4)
if len(data) < data_len + 8:
return None, data
return None, data, b""
packet = data[8 : data_len + 8]
data = data[data_len + 8 :]
return packet, data
return packet, data, b""
2 changes: 1 addition & 1 deletion src/fprime_gds/common/communication/ground.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def receive_all(self):
:return: list deframed packets
"""
self.data += self.tcp.read()
(frames, self.data) = self.deframer.deframe_all(self.data, no_copy=True)
(frames, self.data, _) = self.deframer.deframe_all(self.data, no_copy=True)
return frames

def send_all(self, frames):
Expand Down
31 changes: 26 additions & 5 deletions src/fprime_gds/common/communication/updown.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,23 @@ class Downlinker:
"""

def __init__(
self, adapter: BaseAdapter, ground: GroundHandler, deframer: FramerDeframer
self,
adapter: BaseAdapter,
ground: GroundHandler,
deframer: FramerDeframer,
discarded=None,
):
"""Initialize the downlinker
Constructs a new downlinker object used to run the downlink and deframing operation.
Constructs a new downlinker object used to run the downlink and deframing operation. This downlinker will log
discarded (unframed) data when discarded is a writable data object. When discarded is None the discarded data is
dropped.
Args:
adapter: adapter used to read raw data from the hardware connection
ground: handles the ground side connection
deframer: deframer used to deframe data from the communication format
discarded: file to write discarded data to. None to drop the data.
"""
self.running = True
self.th_ground = None
Expand All @@ -54,13 +61,18 @@ def __init__(
self.ground = ground
self.deframer = deframer
self.outgoing = Queue()
self.discarded = discarded

def start(self):
"""Starts the downlink pipeline"""
self.th_ground = threading.Thread(target=self.sending, name="DownlinkTTSGroundThread")
self.th_ground = threading.Thread(
target=self.sending, name="DownlinkTTSGroundThread"
)
self.th_ground.daemon = True
self.th_ground.start()
self.th_data = threading.Thread(target=self.deframing, name="DownLinkDeframingThread")
self.th_data = threading.Thread(
target=self.deframing, name="DownLinkDeframingThread"
)
self.th_data.daemon = True
self.th_data.start()

Expand All @@ -74,12 +86,20 @@ def deframing(self):
while self.running:
# Blocks until data is available, but may still return b"" if timeout
pool += self.adapter.read()
frames, pool = self.deframer.deframe_all(pool, no_copy=True)
frames, pool, discarded_data = self.deframer.deframe_all(pool, no_copy=True)
try:
for frame in frames:
self.outgoing.put_nowait(frame)
except Full:
DW_LOGGER.warning("GDS ground queue full, dropping frame")
try:
if self.discarded is not None:
self.discarded.write(discarded_data)
self.discarded.flush()
# Failure to write discarded data should never stop the GDS. Log it and move on.
except Exception as exc:
DW_LOGGER.warning("Cannot write discarded data %s", exc)
self.discarded = None # Give up on logging further data

def sending(self):
"""Outgoing stage of downlink
Expand Down Expand Up @@ -107,6 +127,7 @@ def join(self):
for thread in [self.th_data, self.th_ground]:
if thread is not None:
thread.join()
self.discarded = None

def add_loopback_frame(self, frame):
"""Adds a frame to loopback to ground
Expand Down
25 changes: 19 additions & 6 deletions src/fprime_gds/common/files/downlinker.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def __init__(self, directory, timeout=20.0, log_dir=None):
self.sequence = 0 # Keeps track of what the current sequence ID should be
self.timer = fprime_gds.common.files.helpers.Timeout()
self.timer.setup(self.timeout, timeout)
os.makedirs(self.__directory, exist_ok=True)

def data_callback(self, data, sender=None):
"""
Expand Down Expand Up @@ -96,7 +95,15 @@ def handle_start(self, data):
size,
self.__log_dir,
)
self.active.open(TransmitFileState.WRITE)
try:
self.active.open(TransmitFileState.WRITE)
except PermissionError as exc:
self.state = FileStates.ERROR
LOGGER.warning(
"Unable to open file for writing. Discarding subsequent downlink packets. "
+ str(exc)
)
return
LOGGER.addHandler(self.active.log_handler)
message = "Received START packet with metadata:\n"
message += "\tSize: %d\n"
Expand All @@ -116,7 +123,10 @@ def handle_data(self, data):
# Initialize all relevant DATA packet attributes into variables from file_data
offset = data.offset
if self.state != FileStates.RUNNING:
LOGGER.warning("Received unexpected data packet for offset: %d", offset)
# ERROR state means GDS is not ready to receive data packets (permission error)
# No need to log this, as it is already logged in handle_start and would only spam logs
if self.state != FileStates.ERROR:
LOGGER.warning("Received unexpected data packet for offset: %d", offset)
else:
if data.seqID != self.sequence:
LOGGER.warning(
Expand Down Expand Up @@ -156,9 +166,10 @@ def handle_end(self, data):
"""
# Initialize all relevant END packet attributes into variables from file_data
# hashValue attribute is not relevant right now, but might be in the future
if self.state != FileStates.RUNNING:
LOGGER.warning("Received unexpected END packet")
else:
if self.state == FileStates.ERROR:
LOGGER.info("Received END packet for discarded downlink")
self.finish()
elif self.state == FileStates.RUNNING:
if data.seqID != self.sequence:
LOGGER.warning(
"End packet has unexpected sequence id. Expected: %d got %d",
Expand All @@ -167,6 +178,8 @@ def handle_end(self, data):
)
LOGGER.info("Received END packet, finishing downlink")
self.finish()
else:
LOGGER.warning("Received unexpected END packet")

def timeout(self):
"""Timeout the current downlink"""
Expand Down
1 change: 1 addition & 0 deletions src/fprime_gds/common/files/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class FileStates(enum.Enum):
RUNNING = 1
CANCELED = 2
END_WAIT = 3 # Waiting for the handshake for CANCEL or END packet
ERROR = 4


class CFDPChecksum:
Expand Down
13 changes: 11 additions & 2 deletions src/fprime_gds/common/logger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import os
import sys

INITIALIZED = False


def configure_py_log(directory=None, filename=sys.argv[0], mirror_to_stdout=False):
"""
Expand All @@ -21,7 +23,14 @@ def configure_py_log(directory=None, filename=sys.argv[0], mirror_to_stdout=Fals
:param mode: of file to write
:param mirror_to_stdout: mirror the log output to standard our
"""
handlers = [logging.StreamHandler(sys.stdout)] if directory is None or mirror_to_stdout else []
global INITIALIZED
if INITIALIZED:
return
handlers = (
[logging.StreamHandler(sys.stdout)]
if directory is None or mirror_to_stdout
else []
)
if directory is not None:
log_file = os.path.join(directory, os.path.basename(filename))
log_file = log_file if log_file.endswith(".log") else f"{log_file}.log"
Expand All @@ -33,4 +42,4 @@ def configure_py_log(directory=None, filename=sys.argv[0], mirror_to_stdout=Fals
logging.getLogger().addHandler(handler)
logging.getLogger().setLevel(logging.INFO)
logging.info("Logging system initialized!")

INITIALIZED = True
9 changes: 8 additions & 1 deletion src/fprime_gds/common/pipeline/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
@author mstarch
"""
import os
import fprime_gds.common.files.downlinker
import fprime_gds.common.files.uplinker

Expand All @@ -27,7 +28,8 @@ def setup_file_handling(
self, down_store, file_encoder, file_decoder, distributor, log_dir
):
"""
Sets up the file handling (uplink and downlink) from a pair of encoders and decoders
Sets up the file handling (uplink and downlink) from a pair of encoders and decoders.
Raises a PermissionError if the down_store is not writable.
:param down_store: downlink storage directory
:param file_encoder: file encoder for uplink
Expand All @@ -41,6 +43,11 @@ def setup_file_handling(
)
file_decoder.register(self.__downlinker)
distributor.register("FW_PACKET_HAND", self.__uplinker)
if not os.access(down_store, os.W_OK):
raise PermissionError(
f"{down_store} is not writable. Downlinker not be able to save files. "
"Fix permissions or change storage directory with --file-storage-directory."
)

@property
def uplinker(self):
Expand Down
Loading

0 comments on commit 608f4f7

Please sign in to comment.