From fa3c5752a556b20c40d346cfd34f289f56050998 Mon Sep 17 00:00:00 2001 From: Gavin Medley Date: Sun, 29 Sep 2024 22:22:00 -0600 Subject: [PATCH] Fix incorrect parsing of StringDataEncoding XML elements and resulting parsing bugs for parsing strings themselves Return bytes objects for raw strings and str objects for derived values. --- docs/source/changelog.md | 2 + pyproject.toml | 1 - space_packet_parser/encodings.py | 301 ++++++++++++++++++------------- space_packet_parser/packets.py | 4 +- tests/unit/test_xtcedef.py | 203 ++++++++++++++------- 5 files changed, 324 insertions(+), 187 deletions(-) diff --git a/docs/source/changelog.md b/docs/source/changelog.md index 03b329f..9417eea 100644 --- a/docs/source/changelog.md +++ b/docs/source/changelog.md @@ -23,6 +23,8 @@ Release notes for the `space_packet_parser` library - BREAKING: Removed `word_size` kwarg from packet generator method. We expect all binary data to be integer number of bytes. - BREAKING: Changed `packet_generator` kwarg `skip_header_bits` to `skip_header_bytes`. +- Fixed incorrect parsing of StringDataEncoding elements. Raw string values are now returned as byte buffers. + Derived string values contain python string objects. - The ``CCSDSPacket`` class is now a dictionary subclass, enabling direct lookup of items from the Packet itself. - A ``RawPacketData`` class has been added that is a subclass of bytes. It keeps track of the current parsing location and enables reading of bit lengths as integers or raw bytes. diff --git a/pyproject.toml b/pyproject.toml index aaa6dd9..3079338 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,7 +50,6 @@ myst-parser = "*" sphinx-autoapi = "*" sphinx-rtd-theme = "*" coverage = "*" -numpy = "*" [tool.poetry.group.examples] optional = true diff --git a/space_packet_parser/encodings.py b/space_packet_parser/encodings.py index 25168d3..d3bb585 100644 --- a/space_packet_parser/encodings.py +++ b/space_packet_parser/encodings.py @@ -1,16 +1,14 @@ """DataEncoding definitions""" +# Standard from abc import ABCMeta, abstractmethod import logging import struct from typing import Any, List, Optional, Tuple, Union -import warnings - +# Installed import lxml.etree as ElementTree - -from space_packet_parser.exceptions import ElementNotFoundError +# Local from space_packet_parser import calibrators, comparisons, packets - logger = logging.getLogger(__name__) @@ -171,17 +169,19 @@ class StringDataEncoding(DataEncoding): _supported_encodings = ('US-ASCII', 'ISO-8859-1', 'Windows-1252', 'UTF-8', 'UTF-16', 'UTF-16LE', 'UTF-16BE', 'UTF-32', 'UTF-32LE', 'UTF-32BE') - def __init__(self, - *, - encoding: str = 'UTF-8', - byte_order: Optional[str] = None, - termination_character: Optional[str] = None, - fixed_length: Optional[int] = None, - leading_length_size: Optional[int] = None, - dynamic_length_reference: Optional[str] = None, - use_calibrated_value: Optional[bool] = True, - discrete_lookup_length: Optional[List[comparisons.DiscreteLookup]] = None, - length_linear_adjuster: Optional[callable] = None): + def __init__( # pylint: disable=too-many-branches + self, + *, + encoding: str = 'UTF-8', + byte_order: Optional[str] = None, + fixed_raw_length: Optional[int] = None, + dynamic_length_reference: Optional[str] = None, + discrete_lookup_length: Optional[List[comparisons.DiscreteLookup]] = None, + use_calibrated_value: Optional[bool] = True, + length_linear_adjuster: Optional[callable] = None, + termination_character: Optional[str] = None, + leading_length_size: Optional[int] = None + ): # pylint: disable=pointless-statement f"""Constructor Only one of termination_character, fixed_length, or leading_length_size should be set. Setting more than one @@ -201,17 +201,17 @@ def __init__(self, itself. For example, for a utf-8 encoded string, the hex string must be two hex characters (one byte). For a UTF-16* encoded string, the hex representation of the termination character must be four characters (two bytes). - fixed_length : Optional[int] - Fixed length of the string, in bits. + fixed_raw_length : Optional[int] + Fixed length of the raw string, in bits. Comes from a SizeInBits/Fixed/FixedValue element. leading_length_size : Optional[int] - Fixed size in bits of a leading field that contains the length of the subsequent string. + Fixed size in bits of a leading field that contains the length of the subsequent derived string. dynamic_length_reference : Optional[str] - Name of referenced parameter for dynamic length, in bits. May be combined with a linear_adjuster + Name of referenced parameter for dynamic raw length, in bits. May be combined with a linear_adjuster. use_calibrated_value: Optional[bool] Whether to use the calibrated value on the referenced parameter in dynamic_length_reference. Default is True. discrete_lookup_length : Optional[List[DiscreteLookup]] - List of DiscreteLookup objects with which to determine string length from another parameter. + List of DiscreteLookup objects with which to determine raw string length from another parameter. length_linear_adjuster : Optional[callable] Function that linearly adjusts a size. e.g. if the size reference parameter gives a length in bytes, the linear adjuster should multiply by 8 to give the size in bits. @@ -228,8 +228,27 @@ def __init__(self, self.byte_order = "mostSignificantByteFirst" else: raise ValueError("Byte order must be specified for multi-byte character encodings.") - else: - self.byte_order = byte_order + else: + self.byte_order = byte_order + if self.byte_order and self.byte_order not in ("leastSignificantByteFirst", "mostSignificantByteFirst"): + raise ValueError("If specified, byte order must be one of `leastSignificantByteFirst`, " + "`mostSignificantByteFirst`.") + + if termination_character and leading_length_size: + raise ValueError("Got both a termination character and a leading size for a string encoding.") + + # Check to see if we are specifying raw length in more than one way + buffer_length_specs = sum( + bool(x) for x in (dynamic_length_reference, discrete_lookup_length, fixed_raw_length) + ) + if buffer_length_specs != 1: + raise ValueError("Expected exactly one of dynamic length reference, discrete length lookup, " + "or fixed length for specifying the raw length of a string.") + + if length_linear_adjuster and not dynamic_length_reference: + raise ValueError("Got a length linear adjuster for a string whose length is not specified by a dynamic " + "length reference.") + self.termination_character = termination_character if termination_character: # Always in hex, per 4.3.2.2.5.5.4 of XTCE spec @@ -243,7 +262,8 @@ def __init__(self, f"Expected a hex string representation of a single character, e.g. '58' for " f"character 'X' in utf-8 or '5800' for character 'X' in utf-16-le. Note that " f"variable-width encoding is not yet supported in any encoding.") - self.fixed_length = fixed_length + + self.fixed_length = fixed_raw_length self.leading_length_size = leading_length_size self.dynamic_length_reference = dynamic_length_reference self.use_calibrated_value = use_calibrated_value @@ -251,96 +271,128 @@ def __init__(self, self.length_linear_adjuster = length_linear_adjuster def _calculate_size(self, packet: packets.CCSDSPacket) -> int: - """Calculate the length of the string data item in bits. + """Calculate the size of the raw string buffer field Parameters ---------- - packet: CCSDSPacket - Binary representation of the packet used to get the coming bits and any - previously parsed data items to infer field lengths. + packet : packets.CCSDSPacket + Partially parsed packet for referencing previous data fields. Returns ------- : int - Number of bits in the string data item + Size, in bits, of the raw string buffer. """ - # pylint: disable=too-many-branches if self.fixed_length: - strlen_bits = self.fixed_length - elif self.leading_length_size is not None: # strlen_bits is determined from a preceding integer - strlen_bits = packet.raw_data.read_as_int(self.leading_length_size) - if strlen_bits % 8 != 0: - warnings.warn(f"String length (in bits) is {strlen_bits}, which is not a multiple of 8. " - f"This likely means something is wrong since strings are expected to be integer numbers " - f"of bytes.") - elif self.discrete_lookup_length is not None: + buflen_bits = self.fixed_length + elif self.discrete_lookup_length: for discrete_lookup in self.discrete_lookup_length: - strlen_bits = discrete_lookup.evaluate(packet) - if strlen_bits is not None: + buflen_bits = discrete_lookup.evaluate(packet) + if buflen_bits: break else: raise ValueError('List of discrete lookup values being used for determining length of ' f'string {self} found no matches based on {packet}.') - elif self.dynamic_length_reference is not None: + elif self.dynamic_length_reference: if self.use_calibrated_value is True: - strlen_bits = packet[self.dynamic_length_reference].derived_value + buflen_bits = packet[self.dynamic_length_reference].derived_value else: - strlen_bits = packet[self.dynamic_length_reference].raw_value - strlen_bits = int(strlen_bits) - elif self.termination_character is not None: - # Look through the rest of the packet data to find the termination character - nbits_left = len(packet.raw_data) - packet.raw_data.pos - orig_pos = packet.raw_data.pos - string_buffer = packet.raw_data.read_as_bytes(nbits_left - nbits_left % 8) - # Reset the original position because we only wanted to look ahead - packet.raw_data.pos = orig_pos - try: - strlen_bits = string_buffer.index(self.termination_character) * 8 - except ValueError as exc: - # Termination character not found in the string buffer - raise ValueError(f"Reached the end of the packet data without finding the " - f"termination character {self.termination_character}") from exc + buflen_bits = packet[self.dynamic_length_reference].raw_value + + if self.length_linear_adjuster: + buflen_bits = self.length_linear_adjuster(buflen_bits) else: - raise ValueError("Unable to parse StringParameterType. " - "Didn't contain any way to constrain the length of the string.") - if not self.termination_character and self.length_linear_adjuster is not None: - # Only adjust if we are not doing this by termination character. Adjusting a length that is objectively - # determined via termination character is nonsensical. - strlen_bits = self.length_linear_adjuster(strlen_bits) - return strlen_bits - # pylint: enable=too-many-branches - - def parse_value(self, packet: packets.CCSDSPacket, **kwargs) -> Tuple[str, None]: - """Parse a value from packet data, possibly using previously parsed data items to inform parsing. + raise ValueError("No raw length specifier found when decoding a string.") + return int(buflen_bits) + + def _get_raw_buffer(self, packet: packets.CCSDSPacket) -> bytes: + """Get the raw string buffer as bytes. This will include any leading size or termination characters. + + Notes + ----- + If the buffer size is not an integer number of bytes, the bytestring is padded at the end with zeros. + + Parameters + ---------- + packet : packets.CCSDSPacket + Packet parsed so far, for referencing previous values + + Returns + ------- + : bytes + The raw string buffer, padded on the RHS to a byte boundary if the raw length specified is not an integer + number of bytes. + """ + buflen_bits = self._calculate_size(packet) + pad_bits = (8 - (buflen_bits % 8)) % 8 + buflen_bytes = (buflen_bits + pad_bits) // 8 + if (buflen_bits + pad_bits) % 8 != 0: + raise ValueError(f"Error in buffer length math in _get_raw_buffer. " + f"buflen_bits={buflen_bits}, pad_bits={pad_bits}, buflen_bytes={buflen_bytes}") + # read_as_bytes pads on the left because it internally treats bytes as integers, + # but for strings, we want any padding on the right, so shift the bytestring left by pad_bits + raw_string_buffer = ( + packet.raw_data.read_as_int(buflen_bits) << pad_bits + ).to_bytes(buflen_bytes, "big") + return raw_string_buffer + + def parse_value(self, packet: packets.CCSDSPacket, **kwargs) -> Tuple[bytes, str]: + """Parse a string value from packet data, possibly using previously parsed data items to inform parsing. Parameters ---------- packet: CCSDSPacket Binary representation of the packet used to get the coming bits and any previously parsed data items to infer field lengths. + Returns ------- + : bytes + Raw string bytes buffer. This includes any leading size or termination character and may be a non-integer + number of bytes, padded on the RHS. : str - Parsed value - : None - Calibrated value + Parsed string value, as a python string object. """ - nbits = self._calculate_size(packet) - parsed_value = packet.raw_data.read_as_bytes(nbits) - if self.termination_character is not None: - # We need to skip over the termination character if there was one - packet.raw_data.pos += len(self.termination_character) * 8 - return parsed_value.decode(self.encoding), None + raw_string_buffer = packets.RawPacketData(self._get_raw_buffer(packet)) + if self.leading_length_size: + strlen_bits = raw_string_buffer.read_as_int(self.leading_length_size) + if strlen_bits % 8 != 0: + raise ValueError(f"String length (in bits) is {strlen_bits}, which is not a multiple of 8. " + "This is an error since strings must be an integer numbers of bytes.") + parsed_string = raw_string_buffer.read_as_bytes(strlen_bits).decode(self.encoding) + elif self.termination_character is not None: + try: + tchar_byte_index = raw_string_buffer.index(self.termination_character) + except ValueError as exc: + raise ValueError(f"Reached the end of the raw string buffer {raw_string_buffer} without finding the " + f"termination character {self.termination_character}") from exc + parsed_string = raw_string_buffer.read_as_bytes(tchar_byte_index * 8).decode(self.encoding) + else: + # Indicates there is no further parsing. The raw string value is the whole string value. + parsed_string = raw_string_buffer.decode(self.encoding) + + return bytes(raw_string_buffer), parsed_string @classmethod def from_data_encoding_xml_element(cls, element: ElementTree.Element, ns: dict) -> 'StringDataEncoding': """Create a data encoding object from an XML element. - Strings in XTCE can be described in three ways: - 1. Using a termination character that marks the end of the string. - 2. Using a fixed length, which may be derived from referenced parameter either directly or via a discrete - lookup table. - 3. Using a leading size field that describes the size of the following string. + Notes + ----- + Raw strings in XTCE can be described in two ways: + + 1. Using a SizeInBits/Fixed/FixedSize element that specifies the length of the raw string, including any + termination character or leading size integer. This is sometimes referred to as the "container" or + "memory allocation" for the string. + 2. Using a Variable element that contains either a DynamicValue or a DiscreteLookup to other parameters + that specify the length of the raw string, including any termination character or leading size integer. + + Derived strings in XTCE can be specified in two ways: + + 1. Via a TerminationCharacter element that contains the hex value of the termination character in the + specified encoding (e.g. UTF-16BE). + 2. Via a LeadingSize element that specifies the number of bits allocated at the beginning of the raw string + for an integer that specifies the subsequent length, in bits, of the derived string. Parameters ---------- @@ -353,59 +405,62 @@ def from_data_encoding_xml_element(cls, element: ElementTree.Element, ns: dict) ------- cls """ + init_kwargs = {} # Build up kwargs for class initialization encoding: str = element.get("encoding", "UTF-8") + init_kwargs["encoding"] = encoding - byte_order = None # fallthrough value if encoding not in ('US-ASCII', 'ISO-8859-1', 'Windows-1252', 'UTF-8'): # single-byte chars if not (encoding.endswith("BE") or encoding.endswith("LE")): byte_order = element.get("byteOrder") + init_kwargs["byte_order"] = byte_order if byte_order is None: raise ValueError("For multi-byte character encodings, byte order must be specified " "either using the byteOrder attribute or via the encoding itself.") - try: - termination_character = element.find('xtce:SizeInBits/xtce:TerminationChar', ns).text - return cls(termination_character=termination_character, encoding=encoding, byte_order=byte_order) - except AttributeError: - pass - - try: - leading_length_size = int( - element.find('xtce:SizeInBits/xtce:LeadingSize', ns).attrib['sizeInBitsOfSizeTag']) - return cls(leading_length_size=leading_length_size, encoding=encoding, byte_order=byte_order) - except AttributeError: - pass + # Raw string specifiers + if element.find("xtce:SizeInBits", ns) is not None: + # This is a fixed length raw string + size_element = element.find("xtce:SizeInBits", ns) + fixed_raw_length = int(size_element.find("xtce:Fixed/xtce:FixedValue", ns).text) + init_kwargs["fixed_raw_length"] = fixed_raw_length + elif element.find("xtce:Variable", ns) is not None: + # This is a variable length raw string + size_element = element.find("xtce:Variable", ns) + if size_element.find("xtce:DynamicValue", ns) is not None: + # Raw string length is specified by reference to another parameter + dynamic_value_element = size_element.find('xtce:DynamicValue', ns) + referenced_parameter = dynamic_value_element.find('xtce:ParameterInstanceRef', ns).attrib[ + 'parameterRef'] + init_kwargs["dynamic_length_reference"] = referenced_parameter + + if 'useCalibratedValue' in dynamic_value_element.find('xtce:ParameterInstanceRef', ns).attrib: + use_calibrated_value = dynamic_value_element.find( + 'xtce:ParameterInstanceRef', ns).attrib['useCalibratedValue'].lower() == "true" + init_kwargs["use_calibrated_value"] = use_calibrated_value + + linear_adjuster = cls._get_linear_adjuster(dynamic_value_element, ns) + init_kwargs["length_linear_adjuster"] = linear_adjuster + elif size_element.find("xtce:DiscreteLookupList", ns) is not None: + # Raw string length is specified by lookup table based on another parameter + discrete_lookup_list_element = element.find('xtce:Variable/xtce:DiscreteLookupList', ns) + discrete_lookup_list = [comparisons.DiscreteLookup.from_discrete_lookup_xml_element(el, ns) + for el in discrete_lookup_list_element.findall('xtce:DiscreteLookup', ns)] + init_kwargs["discrete_lookup_length"] = discrete_lookup_list + else: + raise ValueError("Variable element must contain either DynamicValue or DiscreteLookupList.") + else: + raise ValueError("StringDataEncoding must contain either a SizeInBits or Variable element.") - fixed_element = element.find('xtce:SizeInBits/xtce:Fixed', ns) + # Derived string specifiers + if size_element.find("xtce:TerminationChar", ns) is not None: + termination_character = size_element.find('xtce:TerminationChar', ns).text + init_kwargs["termination_character"] = termination_character - discrete_lookup_list_element = fixed_element.find('xtce:DiscreteLookupList', ns) - if discrete_lookup_list_element is not None: - discrete_lookup_list = [comparisons.DiscreteLookup.from_discrete_lookup_xml_element(el, ns) - for el in discrete_lookup_list_element.findall('xtce:DiscreteLookup', ns)] - return cls(encoding=encoding, byte_order=byte_order, - discrete_lookup_length=discrete_lookup_list) + if size_element.find("xtce:LeadingSize", ns) is not None: + leading_length_size = int(size_element.find('xtce:LeadingSize', ns).attrib['sizeInBitsOfSizeTag']) + init_kwargs["leading_length_size"] = leading_length_size - try: - dynamic_value_element = fixed_element.find('xtce:DynamicValue', ns) - referenced_parameter = dynamic_value_element.find('xtce:ParameterInstanceRef', ns).attrib['parameterRef'] - use_calibrated_value = True - if 'useCalibratedValue' in dynamic_value_element.find('xtce:ParameterInstanceRef', ns).attrib: - use_calibrated_value = dynamic_value_element.find( - 'xtce:ParameterInstanceRef', ns).attrib['useCalibratedValue'].lower() == "true" - linear_adjuster = cls._get_linear_adjuster(dynamic_value_element, ns) - return cls(encoding=encoding, byte_order=byte_order, - dynamic_length_reference=referenced_parameter, use_calibrated_value=use_calibrated_value, - length_linear_adjuster=linear_adjuster) - except AttributeError: - pass - - try: - fixed_length = int(fixed_element.find('xtce:FixedValue', ns).text) - return cls(fixed_length=fixed_length, encoding=encoding, byte_order=byte_order) - except AttributeError: - pass - - raise ElementNotFoundError(f"Failed to parse StringDataEncoding for element {ElementTree.tostring(element)}") + return cls(**init_kwargs) class NumericDataEncoding(DataEncoding, metaclass=ABCMeta): diff --git a/space_packet_parser/packets.py b/space_packet_parser/packets.py index b1c5aab..7d44153 100644 --- a/space_packet_parser/packets.py +++ b/space_packet_parser/packets.py @@ -33,7 +33,7 @@ class ParsedDataItem: class RawPacketData(bytes): - """A class to represent raw packet data as bytes. + """A class to represent raw packet data as bytes but whose length is represented by bit length. This class is a subclass of bytes and is used to represent the raw packet data in a more readable way. It is used to store the raw packet data in the Packet @@ -53,7 +53,7 @@ def __len__(self): return self._nbits def __repr__(self): - return f"RawPacketData({self}, {len(self)}B, pos={self.pos})" + return f"RawPacketData({self}, {len(self)}b, pos={self.pos})" def read_as_bytes(self, nbits: int) -> bytes: """Read a number of bits from the packet data as bytes. Reads minimum number of complete bytes required to diff --git a/tests/unit/test_xtcedef.py b/tests/unit/test_xtcedef.py index 57631bf..cd725d4 100644 --- a/tests/unit/test_xtcedef.py +++ b/tests/unit/test_xtcedef.py @@ -752,56 +752,97 @@ def test_polynomial_calibrator_calibrate(xq, expectation): (""" + + 32 + 0058 """, - encodings.StringDataEncoding(termination_character='0058', encoding='UTF-16BE')), + encodings.StringDataEncoding(fixed_raw_length=32, termination_character='0058', encoding='UTF-16BE')), (""" 17 + """, - encodings.StringDataEncoding(fixed_length=17)), + encodings.StringDataEncoding(fixed_raw_length=17, leading_length_size=3)), (""" - - - - - - - - + + + + + + 58 + """, encodings.StringDataEncoding(dynamic_length_reference='SizeFromThisParameter', - length_linear_adjuster=object())), + length_linear_adjuster=object(), + termination_character='58')), (""" - - - - - - - - - - - - + + + + + + + + +""", + encodings.StringDataEncoding(dynamic_length_reference='SizeFromThisParameter', + length_linear_adjuster=object(), + leading_length_size=3)), + (""" + + + + + + + + + + + 58 + + +""", + encodings.StringDataEncoding( + discrete_lookup_length=[ + comparisons.DiscreteLookup([comparisons.Comparison('1', 'P1')], 10), + comparisons.DiscreteLookup([comparisons.Comparison('2', 'P1')], 25) + ], + termination_character="58" + )), + (""" + + + + + + + + + + + + """, encodings.StringDataEncoding( discrete_lookup_length=[ comparisons.DiscreteLookup([comparisons.Comparison('1', 'P1')], 10), comparisons.DiscreteLookup([comparisons.Comparison('2', 'P1')], 25) - ])), + ], + leading_length_size=3 + )), (""" @@ -811,7 +852,7 @@ def test_polynomial_calibrator_calibrate(xq, expectation): """, - definitions.ElementNotFoundError()) + AttributeError()) ] ) def test_string_data_encoding(xml_string: str, expectation): @@ -1097,29 +1138,37 @@ def test_binary_data_encoding(xml_string: str, expectation): """, parameters.StringParameterType(name='TEST_STRING_Type', - encoding=encodings.StringDataEncoding(fixed_length=40))), + encoding=encodings.StringDataEncoding(fixed_raw_length=40))), (""" + + 40 + """, parameters.StringParameterType(name='TEST_STRING_Type', - encoding=encodings.StringDataEncoding(leading_length_size=17))), + encoding=encodings.StringDataEncoding(fixed_raw_length=40, + leading_length_size=17))), (""" + + 40 + 00 """, parameters.StringParameterType(name='TEST_STRING_Type', - encoding=encodings.StringDataEncoding(termination_character='00'))), + encoding=encodings.StringDataEncoding(fixed_raw_length=40, + termination_character='00'))), ] ) def test_string_parameter_type(xml_string: str, expectation): @@ -1135,16 +1184,16 @@ def test_string_parameter_type(xml_string: str, expectation): @pytest.mark.parametrize( - ('parameter_type', 'raw_data', 'current_pos', 'expected'), + ('parameter_type', 'raw_data', 'current_pos', 'expected_raw', 'expected_derived'), [ # Fixed length test (parameters.StringParameterType( 'TEST_STRING', - encodings.StringDataEncoding(fixed_length=3, # Giving length in bytes - length_linear_adjuster=lambda x: 8 * x)), + encodings.StringDataEncoding(fixed_raw_length=24)), # This still 123X456 b'123X456', 0, + b'123', '123'), # Dynamic reference length (parameters.StringParameterType( @@ -1152,88 +1201,116 @@ def test_string_parameter_type(xml_string: str, expectation): encodings.StringDataEncoding(dynamic_length_reference='STR_LEN', use_calibrated_value=False, length_linear_adjuster=lambda x: 8 * x)), - b'BAD WOLF', + b'BAD WOLFABCD', 0, + b'BAD WOLF', 'BAD WOLF'), # Discrete lookup test (parameters.StringParameterType( 'TEST_STRING', - encodings.StringDataEncoding(discrete_lookup_length=[ - comparisons.DiscreteLookup([ - comparisons.Comparison(7, 'P1', '>'), - comparisons.Comparison(99, 'P2', '==', use_calibrated_value=False) - ], lookup_value=8) - ], length_linear_adjuster=lambda x: 8 * x)), + encodings.StringDataEncoding( + discrete_lookup_length=[ + comparisons.DiscreteLookup([ + comparisons.Comparison(7, 'P1', '>'), + comparisons.Comparison(99, 'P2', '==', use_calibrated_value=False) + ], lookup_value=64) + ] + )), b'BAD WOLF', 0, + b'BAD WOLF', 'BAD WOLF'), # Termination character tests (parameters.StringParameterType( 'TEST_STRING', - encodings.StringDataEncoding(encoding='UTF-8', - termination_character='58')), + encodings.StringDataEncoding( + fixed_raw_length=64, + encoding='UTF-8', + termination_character='58')), # 123X456 + extra characters, termination character is X b'123X456000000000000000000000000000000000000000000000', 0, + b'123X4560', '123'), (parameters.StringParameterType( 'TEST_STRING', - encodings.StringDataEncoding(encoding='UTF-8', - termination_character='58')), + encodings.StringDataEncoding( + fixed_raw_length=128, + encoding='UTF-8', + termination_character='58')), # 56bits + 123X456 + extra characters, termination character is X b'9090909123X456000000000000000000000000000000000000000000000', 56, + b'123X456000000000', '123'), (parameters.StringParameterType( 'TEST_STRING', - encodings.StringDataEncoding(encoding='UTF-8', - termination_character='58')), + encodings.StringDataEncoding( + fixed_raw_length=48, + encoding='UTF-8', + termination_character='58')), # 53bits + 123X456 + extra characters, termination character is X # This is the same string as above but bit-shifted left by 3 bits b'\x03K;s{\x93)\x89\x91\x9a\xc1\xa1\xa9\xb3K;s{\x93(', 53, + b'123X45', '123'), (parameters.StringParameterType( "TEST_STRING", - encodings.StringDataEncoding(encoding="UTF-8", - termination_character='00')), + encodings.StringDataEncoding( + fixed_raw_length=160, + encoding="UTF-8", + termination_character='00')), "false_is_truthy".encode("UTF-8") + b'\x00ABCD', 0, + b'false_is_truthy\x00ABCD', 'false_is_truthy'), (parameters.StringParameterType( "TEST_STRING", - encodings.StringDataEncoding(encoding="UTF-16BE", - termination_character='0021')), + encodings.StringDataEncoding( + fixed_raw_length=19*16, + encoding="UTF-16BE", + termination_character='0021')), "false_is_truthy".encode("UTF-16BE") + b'\x00\x21ignoreme', 0, + "false_is_truthy".encode("UTF-16BE") + b'\x00\x21ignore', 'false_is_truthy'), (parameters.StringParameterType( 'TEST_STRING', - encodings.StringDataEncoding(encoding='UTF-16LE', - termination_character='5800')), + encodings.StringDataEncoding( + fixed_raw_length=16*6, + encoding='UTF-16LE', + termination_character='5800')), # 123X456, termination character is X '123X456'.encode('UTF-16LE'), 0, + '123X45'.encode('UTF-16LE'), '123'), (parameters.StringParameterType( 'TEST_STRING', - encodings.StringDataEncoding(encoding='UTF-16BE', - termination_character='0058')), + encodings.StringDataEncoding( + fixed_raw_length=16*5, + encoding='UTF-16BE', + termination_character='0058')), '123X456'.encode('UTF-16BE'), 0, + '123X4'.encode('UTF-16BE'), '123'), # Leading length test (parameters.StringParameterType( 'TEST_STRING', - encodings.StringDataEncoding(leading_length_size=5)), + encodings.StringDataEncoding( + fixed_raw_length=29, + leading_length_size=5)), # This is still 123X456 but with 11000 prepended (a 5-bit representation of the number 24) # This represents a string length (in bits) of 24 bits. 0b1100000110001001100100011001101011000001101000011010100110110000.to_bytes(8, byteorder="big"), 0, + 0b11000001100010011001000110011000.to_bytes(4, byteorder="big"), '123'), ] ) -def test_string_parameter_parsing(parameter_type, raw_data, current_pos, expected): +def test_string_parameter_parsing(parameter_type, raw_data, current_pos, expected_raw, expected_derived): """Test parsing a string parameter""" # pre parsed data to reference for lookups packet = packets.CCSDSPacket(raw_data=raw_data, **{'P1': packets.ParsedDataItem('P1', 7, None, 7.55), @@ -1241,8 +1318,9 @@ def test_string_parameter_parsing(parameter_type, raw_data, current_pos, expecte 'STR_LEN': packets.ParsedDataItem('STR_LEN', 8, None)}) # Artificially set the current position of the packet data read so far packet.raw_data.pos = current_pos - raw, _ = parameter_type.parse_value(packet) - assert raw == expected + raw, derived = parameter_type.parse_value(packet) + assert raw == expected_raw + assert derived == expected_derived @pytest.mark.parametrize( @@ -1842,13 +1920,17 @@ def test_binary_parameter_parsing(parameter_type, raw_data, expected): + + 40 + 00 """, parameters.BooleanParameterType(name='TEST_PARAM_Type', unit='smoot', - encoding=encodings.StringDataEncoding(termination_character='00'))), + encoding=encodings.StringDataEncoding(fixed_raw_length=40, + termination_character='00'))), ] ) def test_boolean_parameter_type(xml_string, expectation): @@ -1874,11 +1956,10 @@ def test_boolean_parameter_type(xml_string, expectation): b'\x00', True), (parameters.BooleanParameterType( 'TEST_BOOL', - encodings.StringDataEncoding(encoding="UTF-8", termination_character='00')), - 0b011001100110000101101100011100110110010101011111011010010111001101011111011101000111001001110101011101000110100001111001000000000010101101010111.to_bytes( - length=18, byteorder='big'), + encodings.StringDataEncoding(fixed_raw_length=120, encoding="UTF-8")), + b'false_is_truthyextradata', 0, - 'false_is_truthy', True), + b'false_is_truthy', True), (parameters.BooleanParameterType( 'TEST_BOOL', encodings.IntegerDataEncoding(size_in_bits=2, encoding="unsigned")),