Skip to content

Commit

Permalink
pytest passes for test_data_product_writer
Browse files Browse the repository at this point in the history
  • Loading branch information
watney committed May 11, 2024
1 parent d1da837 commit b7f9389
Show file tree
Hide file tree
Showing 20 changed files with 10,196 additions and 394 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Repository = "https://github.com/fprime-community/fprime-gds"
[project.scripts]
fprime-cli = "fprime_gds.executables.fprime_cli:main"
fprime-seqgen = "fprime_gds.common.tools.seqgen:main"
fprime-dp-write = "fprime_gds.executables.data_product_writer:main"

[project.gui-scripts]
fprime-gds = "fprime_gds.executables.run_deployment:main"
Expand Down
164 changes: 93 additions & 71 deletions src/fprime_gds/executables/data_product_writer.py
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#!/bin/env python3

# ------------------------------------------------------------------------------------------
# Program: Data Product Writer
#
Expand Down Expand Up @@ -82,6 +80,7 @@

import struct
import json
from json import JSONDecodeError
import os
import sys
from typing import List, Dict, Union, ForwardRef
Expand All @@ -90,6 +89,8 @@
import argparse
from binascii import crc32

binaryFile = None

class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
Expand All @@ -101,7 +102,7 @@ class bcolors:
UNDERLINE = '\033[4m'


DP_HEADER_FILE = "dpspec.json"
DP_HEADER_FILE = "dp_writer_data/dpspec.json"

# Deserialize the binary file big endian
BIG_ENDIAN = ">"
Expand Down Expand Up @@ -148,8 +149,6 @@ def kind_qualifiedIdentifier(cls, v):

Type = ForwardRef('Type')
ArrayType = ForwardRef('ArrayType')
EnumType = ForwardRef('Enumtype')


class QualifiedType(BaseModel):
kind: str
Expand Down Expand Up @@ -184,11 +183,21 @@ def kind_qualifiedIdentifier(cls, v):
raise ValueError('Check the "kind" field')
return v

class EnumeratedConstant(BaseModel):
name: str
value: int

class RepresentationType(BaseModel):
name: str
kind: str
size: int
signed: bool

class EnumType(BaseModel):
kind: str
qualifiedName: str
representationType: IntegerType
identifiers: Dict[str, int]
representationType: RepresentationType
enumeratedConstants: List[EnumeratedConstant]

@field_validator('kind')
def kind_qualifiedIdentifier(cls, v):
Expand All @@ -202,15 +211,16 @@ class Type(BaseModel):

class RecordStruct(BaseModel):
name: str
description: str
type: Union[StructType, ArrayType, IntegerType, FloatType, BoolType, QualifiedType]
array: bool
id: int
annotation: str

class ContainerStruct(BaseModel):
name: str
description: str
id: int
defaultPriority: int
annotation: str

# -------------------------------------------------------------------------------------
# These Pydantic classes define the FPRIME_DICTIONARY_FILE
Expand Down Expand Up @@ -283,6 +293,7 @@ class DPHeader(BaseModel):
def read_and_deserialize(nbytes: int, intType: IntegerType) -> int:
global totalBytesRead
global calculatedCRC
global binaryFile

bytes_read = binaryFile.read(nbytes)
if len(bytes_read) != nbytes:
Expand Down Expand Up @@ -628,7 +639,7 @@ def __str__(self):
# No explicit exceptions are raised by this function, but it triggers the program's termination.
# -----------------------------------------------------------------------------------------------------------------
def handleException(msg):
errorMessage = f"*** Error in processing: {args.binFile}"
errorMessage = f"*** Error in processing: "
print(bcolors.FAIL)
print(errorMessage)
print(bcolors.WARNING)
Expand Down Expand Up @@ -663,93 +674,104 @@ def check_record_data(dictJSON: FprimeDict):
else:
idSet.add(record.id)

def parse_args(args=None):
parser = argparse.ArgumentParser(description='Data Product Writer.')
parser.add_argument('binFile', help='Data Product Binary file')
parser.add_argument('jsonDict', help='JSON Dictionary')
if args is None:
args = sys.argv[1:]
return parser.parse_args(args)

# ------------------------------------------------------------------------------------------
# main program
#
# ------------------------------------------------------------------------------------------

# Application arguments
parser = argparse.ArgumentParser(description='Data Product Writer.')
parser.add_argument('binFile', help='Data Product Binary file')
parser.add_argument('jsonDict', help='JSON Dictionary')
args = parser.parse_args()

try:
def process(args):
global binaryFile
global calculatedCRC
global totalBytesRead

# Read the F prime JSON dictionary
print(f"Parsing {args.jsonDict}...")
try:
with open(args.jsonDict, 'r') as fprimeDictFile:
dictJSON = FprimeDict(**json.load(fprimeDictFile))
except JSONDecodeError as e:
raise DictionaryError(args.jsonDict, e.lineno)

check_record_data(dictJSON)

# Read the Data product header JSON spec
print(f"Parsing {DP_HEADER_FILE}...")
try:
# Read the F prime JSON dictionary
print(f"Parsing {args.jsonDict}...")
try:
with open(args.jsonDict, 'r') as fprimeDictFile:
dictJSON = FprimeDict(**json.load(fprimeDictFile))
except JSONDecodeError as e:
raise DictionaryError(args.jsonDict, e.lineno)

check_record_data(dictJSON)

# Read the Data product header JSON spec
print(f"Parsing {DP_HEADER_FILE}...")
try:
with open(DP_HEADER_FILE, 'r') as dpHeaderFile:
headerJSON = DPHeader(**json.load(dpHeaderFile))
except JSONDecodeError as e:
raise DictionaryError(DP_HEADER_FILE, e.lineno)

# Read the json data product header
with open(DP_HEADER_FILE, 'r') as dpHeaderFile:
headerJSON = DPHeader(**json.load(dpHeaderFile))
except JSONDecodeError as e:
raise DictionaryError(DP_HEADER_FILE, e.lineno)

# Read the json data product header
with open(DP_HEADER_FILE, 'r') as dpHeaderFile:
headerJSON = DPHeader(**json.load(dpHeaderFile))

with open(args.binFile, 'rb') as binaryFile:

with open(args.binFile, 'rb') as binaryFile:
totalBytesRead = 0
calculatedCRC = 0

totalBytesRead = 0
calculatedCRC = 0
# Read the header data up until the Records
headerData = get_header_info(headerJSON)

# Read the header data up until the Records
headerData = get_header_info(headerJSON)
# Read the total data size
dataSize = headerData['DataSize']

# Read the total data size
dataSize = headerData['DataSize']
# Restart the count of bytes read
totalBytesRead = 0

# Restart the count of bytes read
totalBytesRead = 0
recordList = [headerData]

recordList = [headerData]
while totalBytesRead < dataSize:

while totalBytesRead < dataSize:
recordData = get_record_data(headerJSON, dictJSON)
recordList.append(recordData)

recordData = get_record_data(headerJSON, dictJSON)
recordList.append(recordData)
computedCRC = calculatedCRC
# Read the data checksum
headerData['dataHash'] = read_field(headerJSON.dataHash.type)

computedCRC = calculatedCRC
# Read the data checksum
headerData['dataHash'] = read_field(headerJSON.dataHash.type)
if computedCRC != headerData['dataHash']:
raise CRCError("Data", headerData['dataHash'], computedCRC)

if computedCRC != headerData['dataHash']:
raise CRCError("Data", headerData['dataHash'], computedCRC)

except (FileNotFoundError, RecordIDNotFound, IOError, KeyError, JSONDecodeError,
DictionaryError, CRCError, DuplicateRecordID) as e:
handleException(e)

except (FileNotFoundError, RecordIDNotFound, IOError, KeyError, JSONDecodeError,
DictionaryError, CRCError, DuplicateRecordID) as e:
handleException(e)
except (ValueError) as e:
error = e.errors()[0]
msg = f'ValueError in JSON file {error["loc"]}: {error["msg"]}'
handleException(msg)

except (ValueError) as e:
error = e.errors()[0]
msg = f'ValueError in JSON file {error["loc"]}: {error["msg"]}'
handleException(msg)

# Output the generated json to a file
baseName = os.path.basename(args.binFile)
outputJsonFile = os.path.splitext(baseName)[0] + '.json'
if outputJsonFile.startswith('._'):
outputJsonFile = outputJsonFile.replace('._', '')
with open(outputJsonFile, 'w') as file:
json.dump(recordList, file, indent=2)

# Output the generated json to a file
baseName = os.path.basename(args.binFile)
outputJsonFile = os.path.splitext(baseName)[0] + '.json'
if outputJsonFile.startswith('._'):
outputJsonFile = outputJsonFile.replace('._', '')
with open(outputJsonFile, 'w') as file:
json.dump(recordList, file, indent=2)
print(f'Output data generated in {outputJsonFile}')

print(f'Output data generated in {outputJsonFile}')

# ------------------------------------------------------------------------------------------
# main program
#
# ------------------------------------------------------------------------------------------
def main():
args = parse_args()
process(args)


if __name__ == "main":
sys.exit(main())


File renamed without changes.
Loading

0 comments on commit b7f9389

Please sign in to comment.