-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[uss_qualifier/scenarios/netrid/nominal_behavior] Add checks for UA classification in SP (NET0260) #870
base: main
Are you sure you want to change the base?
[uss_qualifier/scenarios/netrid/nominal_behavior] Add checks for UA classification in SP (NET0260) #870
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
import datetime | ||
import math | ||
from typing import List, Optional | ||
from typing import List, Optional, Dict | ||
|
||
from arrow import ParserError | ||
from implicitdict import StringBasedDateTime | ||
|
@@ -33,6 +34,14 @@ | |
from monitoring.uss_qualifier.resources.netrid.evaluation import EvaluationConfiguration | ||
from monitoring.uss_qualifier.scenarios.scenario import TestScenarioType | ||
|
||
UA_CLASSIFICATION_FIELDS = [ | ||
"eu_classification" | ||
] # all field names specifying UA classification type | ||
|
||
|
||
def _get_classification_fields(details: dict) -> Dict[str, Optional[dict]]: | ||
return {field: details.get(field) for field in UA_CLASSIFICATION_FIELDS} | ||
|
||
|
||
class RIDCommonDictionaryEvaluator(object): | ||
def __init__( | ||
|
@@ -95,19 +104,32 @@ def evaluate_dp_flight( | |
participants, | ||
) | ||
|
||
def evaluate_sp_details(self, details: FlightDetails, participants: List[str]): | ||
def evaluate_sp_details( | ||
self, | ||
injected_details: injection.RIDFlightDetails, | ||
observed_details: FlightDetails, | ||
participant_id: ParticipantID, | ||
query_timestamp: datetime.datetime, | ||
): | ||
"""Implements fragment documented in `common_dictionary_evaluator_sp_flight_details.md`.""" | ||
|
||
self._evaluate_uas_id(details.raw.get("uas_id"), participants) | ||
self._evaluate_operator_id(None, details.operator_id, participants) | ||
self._evaluate_uas_id(observed_details.raw.get("uas_id"), [participant_id]) | ||
self._evaluate_ua_classification( | ||
_get_classification_fields(injected_details), | ||
_get_classification_fields(observed_details.raw), | ||
participant_id, | ||
query_timestamp, | ||
) | ||
|
||
self._evaluate_operator_id(None, observed_details.operator_id, [participant_id]) | ||
self._evaluate_operator_location( | ||
None, | ||
None, | ||
None, | ||
details.operator_location, | ||
details.operator_altitude, | ||
details.operator_altitude_type, | ||
participants, | ||
observed_details.operator_location, | ||
observed_details.operator_altitude, | ||
observed_details.operator_altitude_type, | ||
[participant_id], | ||
) | ||
|
||
def evaluate_dp_details( | ||
|
@@ -651,3 +673,209 @@ def _evaluate_operational_status( | |
key="skip_reason", | ||
message=f"Unsupported version {self._rid_version}: skipping Operational Status evaluation", | ||
) | ||
|
||
def _evaluate_ua_type( | ||
self, | ||
injected_val: Optional[str], | ||
observed_val: Optional[str], | ||
participants: List[ParticipantID], | ||
query_timestamp: datetime.datetime, | ||
): | ||
with self._test_scenario.check( | ||
"UA type is present and consistent with injected one", | ||
participants, | ||
) as check: | ||
if observed_val is None: | ||
check.record_failed( | ||
"UA type is missing", | ||
details="USS did not return any UA type", | ||
query_timestamps=[query_timestamp], | ||
) | ||
elif not observed_val: | ||
check.record_failed( | ||
"UA type is empty", | ||
details="USS returned an empty UA type", | ||
query_timestamps=[query_timestamp], | ||
) | ||
|
||
equivalent = {injection.UAType.HybridLift, injection.UAType.VTOL} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A little comment explaining this might help us in the future |
||
if injected_val is None: | ||
if observed_val != injection.UAType.NotDeclared: | ||
check.record_failed( | ||
"UA type is inconsistent, expected 'NotDeclared' since no value was injected", | ||
details=f"USS returned the UA type {observed_val}, yet no value was injected, which should have been mapped to 'NotDeclared'.", | ||
query_timestamps=[query_timestamp], | ||
) | ||
|
||
elif injected_val in equivalent: | ||
if observed_val not in equivalent: | ||
check.record_failed( | ||
"UA type is inconsistent with injected value", | ||
details=f"USS returned the UA type {observed_val}, yet the value {injected_val} was injected, given that {equivalent} are equivalent .", | ||
query_timestamps=[query_timestamp], | ||
) | ||
|
||
elif injected_val != observed_val: | ||
check.record_failed( | ||
"UA type is inconsistent with injected value", | ||
details=f"USS returned the UA type {observed_val}, yet the value {injected_val} was injected.", | ||
query_timestamps=[query_timestamp], | ||
) | ||
|
||
with self._test_scenario.check( | ||
"UA type is consistent with Common Data Dictionary", | ||
participants, | ||
) as check: | ||
try: | ||
injection.UAType(observed_val) | ||
except ValueError: | ||
check.record_failed( | ||
"UA type is invalid", | ||
details=f"USS returned an invalid UA type: {observed_val}.", | ||
query_timestamps=[query_timestamp], | ||
) | ||
Comment on lines
+729
to
+736
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we actually reach this code if Or do we expect |
||
|
||
if ( | ||
self._rid_version == RIDVersion.f3411_19 | ||
and observed_val == injection.UAType.HybridLift | ||
) or ( | ||
self._rid_version == RIDVersion.f3411_22a | ||
and observed_val == injection.UAType.VTOL | ||
): | ||
check.record_failed( | ||
"UA type is inconsistent RID version", | ||
details=f"USS returned the UA type {observed_val} which is not supported by the RID version used ({self._rid_version}).", | ||
query_timestamps=[query_timestamp], | ||
) | ||
|
||
def _evaluate_ua_classification( | ||
self, | ||
injected_ua_classifications: Dict[str, Optional[dict]], | ||
observed_ua_classifications: Dict[str, Optional[dict]], | ||
participant_id: ParticipantID, | ||
query_timestamp: datetime.datetime, | ||
): | ||
""" | ||
Note that the classification type is defined implicitly by presence of field 'eu_classification' or not: | ||
> When this field is specified, the Classification Type is "European Union". If no other classification | ||
> field is specified, the Classification Type is "Undeclared". | ||
""" | ||
if self._rid_version == RIDVersion.f3411_19: | ||
self._test_scenario.record_note( | ||
key="skip_reason", | ||
message=f"Unsupported version {self._rid_version}: skipping UA classification evaluation", | ||
) | ||
return | ||
|
||
injected_classification_fields = set(injected_ua_classifications.keys()) | ||
observed_classification_fields = set(observed_ua_classifications.keys()) | ||
|
||
with self._test_scenario.check( | ||
"UA classification type is consistent with injected one", | ||
participant_id, | ||
) as check: | ||
if injected_classification_fields != observed_classification_fields: | ||
check.record_failed( | ||
"UA classification type is inconsistent with injected value.", | ||
details=f"USS returned UA classification type {observed_classification_fields} yet the type injected was {injected_classification_fields}.", | ||
query_timestamps=[query_timestamp], | ||
) | ||
|
||
with self._test_scenario.check( | ||
"UA classification type is consistent with Common Data Dictionary", | ||
participant_id, | ||
) as check: | ||
if len(observed_classification_fields) > 1: | ||
check.record_failed( | ||
"UA classification type is inconsistent with Common Data Dictionary.", | ||
details=f"USS returned several UA classification types {observed_classification_fields}, but either zero or one was expected.", | ||
query_timestamps=[query_timestamp], | ||
) | ||
|
||
if injected_ua_classifications.get("eu_classification") is not None: | ||
# evaluate classification type "European Union" if it was that type that was injected | ||
self._evaluate_ua_classification_eu( | ||
injected_ua_classifications.get("eu_classification"), | ||
observed_ua_classifications.get("eu_classification"), | ||
participant_id, | ||
query_timestamp, | ||
) | ||
|
||
def _evaluate_ua_classification_eu( | ||
self, | ||
injected_eu_classification: Optional[dict], | ||
observed_eu_classification: Optional[dict], | ||
participant_id: ParticipantID, | ||
query_timestamp: datetime.datetime, | ||
): | ||
injected_eu_category = injected_eu_classification.get("category") | ||
injected_eu_class = injected_eu_classification.get("class") | ||
observed_eu_category = ( | ||
observed_eu_classification.get("category") | ||
if observed_eu_classification | ||
else None | ||
) | ||
observed_eu_class = ( | ||
observed_eu_classification.get("class") | ||
if observed_eu_classification | ||
else None | ||
) | ||
Comment on lines
+811
to
+822
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given that we compare the classification dicts earlier (with |
||
|
||
with self._test_scenario.check( | ||
"UA classification for 'European Union' type is consistent with injected one", | ||
participant_id, | ||
) as check: | ||
if injected_eu_category is None: | ||
if ( | ||
observed_eu_category | ||
!= injection.UAClassificationEUCategory.EUCategoryUndefined | ||
): | ||
check.record_failed( | ||
"UA classification for 'European Union' type has an invalid category, expected 'EUCategoryUndefined' since no value was injected.", | ||
details=f"USS returned the category of UA classification for 'European Union' type {observed_eu_category}, yet no value was injected which should have been mapped to 'EUCategoryUndefined'.", | ||
query_timestamps=[query_timestamp], | ||
) | ||
elif injected_eu_category != observed_eu_category: | ||
check.record_failed( | ||
"UA classification for 'European Union' type is inconsistent with injected value.", | ||
details=f"USS returned the category of UA classification for 'European Union' type {observed_eu_category}, yet the category injected was {injected_eu_category}.", | ||
query_timestamps=[query_timestamp], | ||
) | ||
|
||
if injected_eu_class is None: | ||
if ( | ||
observed_eu_class | ||
!= injection.UAClassificationEUClass.EUClassUndefined | ||
): | ||
check.record_failed( | ||
"UA classification for 'European Union' type has an invalid class, expected 'EUClassUndefined' since no or invalid value was injected.", | ||
details=f"USS returned the class of UA classification for 'European Union' type {observed_eu_class}, yet no value was injected which should have been mapped to 'EUClassUndefined'.", | ||
query_timestamps=[query_timestamp], | ||
) | ||
elif injected_eu_class != observed_eu_class: | ||
check.record_failed( | ||
"UA classification for 'European Union' type is inconsistent with injected value.", | ||
details=f"USS returned the class of UA classification for 'European Union' type {observed_eu_class}, yet the class injected was {injected_eu_class}.", | ||
query_timestamps=[query_timestamp], | ||
) | ||
|
||
with self._test_scenario.check( | ||
"UA classification for 'European Union' type is consistent with Common Data Dictionary", | ||
participant_id, | ||
) as check: | ||
try: | ||
injection.UAClassificationEUCategory(observed_eu_category) | ||
except ValueError: | ||
check.record_failed( | ||
"UA classification for 'European Union' type has an invalid category", | ||
details=f"USS returned an invalid category of UA classification for 'European Union' type: {observed_eu_category}.", | ||
query_timestamps=[query_timestamp], | ||
) | ||
try: | ||
injection.UAClassificationEUClass(observed_eu_class) | ||
except ValueError: | ||
check.record_failed( | ||
"UA classification for 'European Union' type has an invalid class", | ||
details=f"USS returned an invalid class of UA classification for 'European Union' type: {observed_eu_class}.", | ||
query_timestamps=[query_timestamp], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to this function is missing (presumably in
evaluate_sp_details
)