Skip to content

Commit

Permalink
Merge pull request #29 from usdot-jpo-ode/ode-1264
Browse files Browse the repository at this point in the history
ODE-1264 Refinement
  • Loading branch information
hmusavi authored May 16, 2019
2 parents e0d5b64 + db15284 commit a796c29
Show file tree
Hide file tree
Showing 9 changed files with 2,805 additions and 74 deletions.
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,11 @@ definitions:
type: array
items:
type: string
skipSequentialValidation:
description: This boolean property specifies if this sequential or chronological field should take part in sequential validation or not.
set the value of this property to True if you would like to skip sequential validation of this field.
type: boolean
```

The following conditional field validation states that the value of `metadata.payloadType` must be equal `us.dot.its.jpo.ode.model.OdeBsmPayload` if
Expand Down Expand Up @@ -488,6 +493,30 @@ EqualsValue = {"startsWithField": "metadata.recordType"}
- Value: ISO timestamp: `LatestTime = 2018-12-03T00:00:00.000Z`
- Note: For more information on how to write parsable timestamps, see [dateutil.parser.parse()](https://dateutil.readthedocs.io/en/stable/parser.html#dateutil.parser.parse).

The following field validation specifies that sequential validation should NOT be enacted on `metadata.recordGeneratedAt` when the record is generated
by TMC (`"metadata.recordGeneratedBy":"TMC"`).

```
[metadata.recordGeneratedAt]
Type = timestamp
LatestTime = NOW
EqualsValue = {"conditions":[{"ifPart":{"fieldName":"metadata.recordGeneratedBy","fieldValues":["TMC"]},"thenPart":{"skipSequentialValidation":"true"}}]}
```

The following field validation specifies that sequential validation should NOT be enacted on `metadata.serialId.recordId` when the records is from
and `rxMsg` OR the records is _santiized_ (`"metadata.sanitized": "True"`.
fields when

```
[metadata.serialId.recordId]
Type = decimal
UpperLimit = 2147483647
LowerLimit = 0
EqualsValue = {"conditions":[
{"ifPart":{"fieldName":"metadata.recordType","fieldValues":["rxMsg"]},"thenPart":{"skipSequentialValidation":"true"}},
{"ifPart":{"fieldName":"metadata.sanitized","fieldValues":["True"]},"thenPart":{"skipSequentialValidation":"true"}}]}
```

**Files**
- [Default Configuration File](odevalidator/config.ini)
- [Sample Data File](../../../data/bsmTx.json)
Expand Down
3 changes: 3 additions & 0 deletions odevalidator/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ Type = string
Type = decimal
UpperLimit = 2147483648
LowerLimit = 1
EqualsValue = {"conditions":[
{"ifPart":{"fieldName":"metadata.recordType","fieldValues":["rxMsg"]},"thenPart":{"skipSequentialValidation":"true"}},
{"ifPart":{"fieldName":"metadata.sanitized","fieldValues":["True"]},"thenPart":{"skipSequentialValidation":"true"}}]}

[metadata.serialId.bundleId]
Type = decimal
Expand Down
5 changes: 3 additions & 2 deletions odevalidator/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ class ValidatorException(Exception):
pass

class FieldValidationResult:
def __init__(self, valid = True, details = "", field_path = None):
def __init__(self, valid = True, details = "", field_path = None, serial_id = None):
self.field_path = field_path
self.valid = valid
self.details = details
self.serial_id = serial_id

def __str__(self):
return json.dumps(self.to_json())

def to_json(self):
return {"Field": self.field_path, "Valid": self.valid, "Details": self.details}
return {"Field": self.field_path, "Valid": self.valid, "Details": self.details, "SerialId": self.serial_id}

class RecordValidationResult:
def __init__(self, serial_id, field_validations, record):
Expand Down
33 changes: 16 additions & 17 deletions odevalidator/sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ def validate_bundle(self, sorted_bundle):
new_record_generated_at = dateutil.parser.parse(record['metadata']['recordGeneratedAt'])
new_ode_received_at = dateutil.parser.parse(record['metadata']['odeReceivedAt'])

if 'metadata.serialId.bundleSize' not in self.skip_validations and record['metadata']['serialId']['bundleSize'] > 1 and new_record_id != old_record_id+1:
validation_results.append(FieldValidationResult(False, "Detected incorrectly incremented recordId. Expected recordId '%d' but got '%d'" % (old_record_id+1, new_record_id), record['metadata']['serialId']))
if 'metadata.serialId.recordId' not in self.skip_validations and record['metadata']['serialId']['bundleSize'] > 1 and new_record_id != old_record_id+1:
validation_results.append(FieldValidationResult(False, "Detected incorrectly incremented recordId. Expected recordId '%d' but got '%d'" % (old_record_id+1, new_record_id), serial_id = record['metadata']['serialId']))
if 'metadata.serialId.serialNumber' not in self.skip_validations and new_serial_number != old_serial_number+1:
validation_results.append(FieldValidationResult(False, "Detected incorrectly incremented serialNumber. Expected serialNumber '%d' but got '%d'" % (old_serial_number+1, new_serial_number), record['metadata']['serialId']))
validation_results.append(FieldValidationResult(False, "Detected incorrectly incremented serialNumber. Expected serialNumber '%d' but got '%d'" % (old_serial_number+1, new_serial_number), serial_id = record['metadata']['serialId']))
if 'metadata.recordGeneratedAt' not in self.skip_validations and new_record_generated_at < old_record_generated_at:
validation_results.append(FieldValidationResult(False, "Detected non-chronological recordGeneratedAt. Previous timestamp was '%s' but current timestamp is '%s'" % (old_record_generated_at, new_record_generated_at), record['metadata']['serialId']))
validation_results.append(FieldValidationResult(False, "Detected non-chronological recordGeneratedAt. Previous timestamp was '%s' but current timestamp is '%s'" % (old_record_generated_at, new_record_generated_at), serial_id = record['metadata']['serialId']))
if 'metadata.odeReceivedAt' not in self.skip_validations and new_ode_received_at < old_ode_received_at:
validation_results.append(FieldValidationResult(False, "Detected non-chronological odeReceivedAt. Previous timestamp was '%s' but current timestamp is '%s'" % (old_ode_received_at, new_ode_received_at), record['metadata']['serialId']))
validation_results.append(FieldValidationResult(False, "Detected non-chronological odeReceivedAt. Previous timestamp was '%s' but current timestamp is '%s'" % (old_ode_received_at, new_ode_received_at), serial_id = record['metadata']['serialId']))

old_record_id = new_record_id
old_serial_number = new_serial_number
Expand All @@ -61,28 +61,27 @@ def validate_bundle(self, sorted_bundle):
def validate_bundle_size(self, sorted_bundle):
first_record_id = int(sorted_bundle[0]['metadata']['serialId']['recordId'])
last_record_id = int(sorted_bundle[-1]['metadata']['serialId']['recordId'])
bundle_size = int(sorted_bundle[0]['metadata']['serialId']['bundleSize'])
cur_bundle_size = int(sorted_bundle[0]['metadata']['serialId']['bundleSize'])
prev_bundle_size = None

validation_results = []
# partial or full list?
if first_record_id == 0:
# head of a partial list?
if last_record_id == bundle_size - 1:
if last_record_id == cur_bundle_size - 1:
# full list
for record in sorted_bundle:
bundle_size = int(record['metadata']['serialId']['bundleSize'])
if 'logFileName' in record['metadata'] and len(sorted_bundle) != bundle_size:
validation_results.append(FieldValidationResult(False, "bundleSize doesn't match number of records. recordId: '%d' record length: '%d' != bundlSize: '%d'" % (record['metadata']['serialId']['recordId'], len(sorted_bundle), bundle_size), record['metadata']['serialId']))

bundle_size = int(sorted_bundle[0]['metadata']['serialId']['bundleSize'])
if last_record_id != bundle_size-1:
validation_results.append(FieldValidationResult(False, "bundleSize doesn't match the last recordId of a full set. recordId: '%d' Last recordId: '%d' != bundlSize: '%d'" % (record['metadata']['serialId']['recordId'], last_record_id, bundle_size), record['metadata']['serialId']))
cur_bundle_size = int(record['metadata']['serialId']['bundleSize'])
if prev_bundle_size != cur_bundle_size and 'logFileName' in record['metadata'] and len(sorted_bundle) != cur_bundle_size:
prev_bundle_size = cur_bundle_size
validation_results.append(FieldValidationResult(False, "bundleSize doesn't match number of records. Number of records: '%d' != bundlSize: '%d'" % (len(sorted_bundle), cur_bundle_size), serial_id = sorted_bundle[-1]['metadata']['serialId']))
else:
# tail of a partial list
for record in sorted_bundle:
bundle_size = int(record['metadata']['serialId']['bundleSize'])
if last_record_id != bundle_size-1:
validation_results.append(FieldValidationResult(False, "bundleSize doesn't match last recordId of a tail set. recordId: '%d' last recordId: '%d' != bundleSize: '%d'" % (record['metadata']['serialId']['recordId'], last_record_id, bundle_size), record['metadata']['serialId']))
cur_bundle_size = int(record['metadata']['serialId']['bundleSize'])
if prev_bundle_size != cur_bundle_size and last_record_id != cur_bundle_size-1:
prev_bundle_size = cur_bundle_size
validation_results.append(FieldValidationResult(False, "bundleSize doesn't match last recordId. Last recordId: '%d' != (bundleSize-1: '%d')" % (last_record_id, cur_bundle_size-1), serial_id = sorted_bundle[-1]['metadata']['serialId']))

return validation_results

Expand Down
8 changes: 5 additions & 3 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@

def assert_results(testcase, results, expected_fail_count):
fail_count = 0
record_num = 0
for res in results:
record_num += 1
result_partition_printed = False
for val in res.field_validations:
if not val.valid:
if not result_partition_printed:
print("\n========")
result_partition_printed = True

print("Record #: %d, SerialId: %s, Field: %s, Details: %s\n--------\n%s" % (record_num, res.serial_id, val.field_path, val.details, res.record))
serial_id = res.serial_id
if not serial_id:
serial_id = val.serial_id

print("SerialId: %s, Field: %s, Details: %s\n--------\n%s" % (serial_id , val.field_path, val.details, res.record))
fail_count += 1

testcase.assertEquals(expected_fail_count, fail_count, "Expected %s failures, got %s failures." % (expected_fail_count, fail_count))
8 changes: 4 additions & 4 deletions tests/result_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ def testFieldValidationResult(self):
self.assertTrue(f.valid)
self.assertEquals("", f.details)
self.assertEquals(None, f.field_path)
self.assertEquals('{"Field": null, "Valid": true, "Details": ""}', json.dumps(f.to_json()))
self.assertEquals('{"Field": null, "Valid": true, "Details": ""}', str(f))
self.assertEquals('{"Field": null, "Valid": true, "Details": "", "SerialId": null}', json.dumps(f.to_json()))
self.assertEquals('{"Field": null, "Valid": true, "Details": "", "SerialId": null}', str(f))

def testRecordValidationResult(self):
f = RecordValidationResult("serial_id", [FieldValidationResult()], "record")
self.assertEquals("serial_id", f.serial_id)
self.assertEquals("record", f.record)
self.assertEquals('{"SerialId": "serial_id", "Validations": [{"Field": null, "Valid": true, "Details": ""}], "Record": "record"}', json.dumps(f.to_json()))
self.assertEquals('{"SerialId": "serial_id", "Validations": [{"Field": null, "Valid": true, "Details": ""}], "Record": "record"}', str(f))
self.assertEquals('{"SerialId": "serial_id", "Validations": [{"Field": null, "Valid": true, "Details": "", "SerialId": null}], "Record": "record"}', json.dumps(f.to_json()))
self.assertEquals('{"SerialId": "serial_id", "Validations": [{"Field": null, "Valid": true, "Details": "", "SerialId": null}], "Record": "record"}', str(f))
106 changes: 59 additions & 47 deletions tests/sequential_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,77 +9,89 @@

class SequentialUnitTest(unittest.TestCase):

def runTest(self):
seq = Sequential()
def setUp(self):
self.seq = Sequential()

seed_record = '{"metadata":{"logFileName":"rxMsg_1553540811_2620:31:40e0:843::1.csv","serialId":{"streamId":"8a4773d8-ae01-4b86-beae-7cd954a32e06","bundleSize":9,"bundleId":864,"recordId":2,"serialNumber":1000},"odeReceivedAt":"2019-03-25T19:21:06.407Z","recordGeneratedAt":"2019-03-14T14:54:21.596Z"}}'
json_seed = json.loads(seed_record)
self.json_seed = json.loads(seed_record)
self.record_list = self.build_happy_path(self.json_seed)

# test happy path
def test_happy_path(self):
print("Testing Happy Path ...")
record_list = self.build_happy_path(json_seed)
results = seq.perform_sequential_validations(record_list)
record_list = self.build_happy_path(self.json_seed)
results = self.seq.perform_sequential_validations(record_list)
assert_results(self, results, 0)

print("Testing Missing recordId, serialNumber and bundleSize ...")
record_list_missing = []
record_list_missing = copy.deepcopy(record_list)
record_list_missing.remove(record_list_missing[19])
record_list_missing.remove(record_list_missing[8])
record_list_missing.remove(record_list_missing[2])
results = seq.perform_sequential_validations(record_list_missing)
assert_results(self, results, 14)

def test_missing_records(self):
print("Testing Missing recordId, serialNumber ...")
self.record_list.remove(self.record_list[19])
self.record_list.remove(self.record_list[8])
self.record_list.remove(self.record_list[2])
results = self.seq.perform_sequential_validations(self.record_list)
assert_results(self, results, 7)

def test_invalid_bundle_size(self):
print("Testing invalid bundleSize ...")
self.record_list.remove(self.record_list[15])
self.record_list.remove(self.record_list[6])
results = self.seq.perform_sequential_validations(self.record_list)
# Even though we have removed the last record of a full bundle, the validator can't detect if this is a head of a full list or a full list.
# Therefore, we should get only one validation error
assert_results(self, results, 1)

def test_dup_and_chronological(self):
print("Testing Duplicate recordId and serialNumber and non-chronological odeReceivedAt and recordGeneratedAt ...")
record_list_dup = copy.deepcopy(record_list)
record_list_dup[19] = copy.deepcopy(record_list_dup[17])
record_list_dup[8] = copy.deepcopy(record_list_dup[6])
record_list_dup[2] = copy.deepcopy(record_list_dup[0])
results = seq.perform_sequential_validations(record_list_dup)
self.record_list[18] = copy.deepcopy(self.record_list[16])
self.record_list[9] = copy.deepcopy(self.record_list[7])
self.record_list[2] = copy.deepcopy(self.record_list[0])
results = self.seq.perform_sequential_validations(self.record_list)
assert_results(self, results, 18)


def build_happy_path(self, json_seed):
record_list = []

#setting up for recordId 3-8 (tail end of a bundle of 9 records)
json_seed['metadata']['logFileName'] = 'rxMsg_partial_1'
json_seed['metadata']['serialId']['bundleId'] = 101
cur_record = json_seed
cur_record['metadata']['serialId']['recordId'] = 2
n = 5
while n >= 0:
cur_record = self._next_record(cur_record)
record_list.append(cur_record)
n -= 1
self.json_seed['metadata']['logFileName'] = 'rxMsg_tail'
self.json_seed['metadata']['serialId']['bundleId'] = 101
self.json_seed['metadata']['serialId']['serialNumber'] = 1001

self.json_seed['metadata']['serialId']['recordId'] = 2
cur_record = self.json_seed
bundle = self._build_bundle(cur_record, 7)
record_list.extend(bundle)

#setting up for recordId 0-9 (full bundle of 9 records)
json_seed['metadata']['logFileName'] = 'rxMsg_Full'
json_seed['metadata']['serialId']['bundleId'] = 102
cur_record = json_seed
cur_record['metadata']['serialId']['recordId'] = -1
n = 8
while n >= 0:
cur_record = self._next_record(cur_record)
record_list.append(cur_record)
n -= 1
cur_record = self._next_record(record_list[-1])
cur_record['metadata']['logFileName'] = 'rxMsg_Full'
cur_record['metadata']['serialId']['bundleId'] = 102
cur_record['metadata']['serialId']['recordId'] = 0
bundle = self._build_bundle(cur_record, 9)
record_list.extend(bundle)

#setting up for recordId 0-6 (front end of a bundle of 9 records)
json_seed['metadata']['logFileName'] = 'rxMsg_partial_2'
json_seed['metadata']['serialId']['bundleId'] = 103
cur_record = json_seed
cur_record['metadata']['serialId']['recordId'] = -1
n = 5
while n >= 0:
cur_record = self._next_record(cur_record)
record_list.append(cur_record)
n -= 1
cur_record = self._next_record(record_list[-1])
cur_record['metadata']['logFileName'] = 'rxMsg_head'
cur_record['metadata']['serialId']['bundleId'] = 103
cur_record['metadata']['serialId']['recordId'] = 0
bundle = self._build_bundle(cur_record, 7)
record_list.extend(bundle)

#for record in record_list:
# print(json.dumps(record))

return record_list

def _build_bundle(self, seed_record, count):
bundle = []
cur_record = copy.deepcopy(seed_record)
while count > 0:
bundle.append(cur_record)
cur_record = self._next_record(cur_record)
count -= 1

return bundle

def _next_record(self, cur_record):
next_record = copy.deepcopy(cur_record)
next_record['metadata']['serialId']['recordId'] += 1
Expand Down
Loading

0 comments on commit a796c29

Please sign in to comment.