Skip to content

Commit

Permalink
[uss_qualifier] add a display data evaluator for disconnected UASes
Browse files Browse the repository at this point in the history
  • Loading branch information
Shastick committed Jan 13, 2025
1 parent abf62e8 commit c245ea8
Showing 1 changed file with 103 additions and 99 deletions.
202 changes: 103 additions & 99 deletions monitoring/uss_qualifier/scenarios/astm/netrid/display_data_evaluator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import datetime

import math
from dataclasses import dataclass
from typing import List, Optional, Dict, Union, Set, Tuple
Expand All @@ -8,7 +7,6 @@
import s2sphere
from loguru import logger
from s2sphere import LatLng, LatLngRect

from uas_standards.interuss.automated_testing.rid.v1.observation import (
Flight,
GetDisplayDataResponse,
Expand All @@ -24,6 +22,7 @@
Position,
)
from monitoring.monitorlib.rid import RIDVersion
from monitoring.monitorlib.temporal import Time
from monitoring.uss_qualifier.common_data_definitions import Severity
from monitoring.uss_qualifier.configurations.configuration import ParticipantID
from monitoring.uss_qualifier.resources.astm.f3411.dss import DSSInstance
Expand Down Expand Up @@ -371,12 +370,16 @@ def _evaluate_normal_observation(
self._injected_flights, observation.flights
)

self._evaluate_flight_presence(
_evaluate_flight_presence(
observer.participant_id,
[query],
True,
mapping_by_injection_id,
verified_sps,
self._test_scenario,
self._injected_flights,
self._rid_version,
self._config,
)

# Check that altitudes match for any observed flights matching injected flights
Expand Down Expand Up @@ -492,101 +495,6 @@ def _evaluate_normal_observation(
],
)

def _evaluate_flight_presence(
self,
observer_participant_id: str,
observation_queries: List[Query],
observer_participant_is_relevant: bool,
mapping_by_injection_id: Dict[str, TelemetryMapping],
verified_sps: Set[str],
):
"""Implements fragment documented in `display_data_evaluator_flight_presence.md`."""

query_timestamps = [q.request.timestamp for q in observation_queries]
observer_participants = (
[observer_participant_id] if observer_participant_is_relevant else []
)
for expected_flight in self._injected_flights:
t_initiated = min(q.request.timestamp for q in observation_queries)
t_response = max(q.response.reported.datetime for q in observation_queries)
timestamps = [
arrow.get(t.timestamp) for t in expected_flight.flight.telemetry
]
t_min = min(timestamps).datetime
t_max = max(timestamps).datetime

if t_response < t_min:
# This flight should definitely not have been observed (it starts in the future)
with self._test_scenario.check(
"Premature flight", [expected_flight.uss_participant_id]
) as check:
if expected_flight.flight.injection_id in mapping_by_injection_id:
check.record_failed(
summary="Flight observed before it started",
details=f"Flight {expected_flight.flight.injection_id} injected into {expected_flight.uss_participant_id} was observed by {observer_participant_id} at {t_response.isoformat()} before that flight should have started at {t_min.isoformat()}",
severity=Severity.Medium,
query_timestamps=query_timestamps
+ [expected_flight.query_timestamp],
)
# TODO: attempt to observe flight details
continue
elif (
t_response
> t_max
+ self._rid_version.realtime_period
+ self._config.max_propagation_latency.timedelta
):
# This flight should not have been observed (it was too far in the past)
participants = observer_participants
if (
expected_flight.uss_participant_id not in verified_sps
or not participants
):
participants.append(expected_flight.uss_participant_id)
with self._test_scenario.check(
"Lingering flight", participants
) as check:
if expected_flight.flight.injection_id in mapping_by_injection_id:
check.record_failed(
summary="Flight still observed long after it ended",
details=f"Flight {expected_flight.flight.injection_id} injected into {expected_flight.uss_participant_id} was observed by {observer_participant_id} at {t_response.isoformat()} after it ended at {t_max.isoformat()}",
severity=Severity.Medium,
query_timestamps=query_timestamps
+ [expected_flight.query_timestamp],
)
continue
elif (
t_min + self._config.max_propagation_latency.timedelta
< t_initiated
< t_max
+ self._rid_version.realtime_period
- self._config.max_propagation_latency.timedelta
):
# This flight should definitely have been observed
participants = observer_participants
if (
expected_flight.uss_participant_id not in verified_sps
or not participants
):
participants.append(expected_flight.uss_participant_id)
with self._test_scenario.check("Missing flight", participants) as check:
if (
expected_flight.flight.injection_id
not in mapping_by_injection_id
):
check.record_failed(
summary="Expected flight not observed",
details=f"Flight {expected_flight.flight.injection_id} injected into {expected_flight.uss_participant_id} was not found in the observation by {observer_participant_id} at {t_response.isoformat()} even though it should have been active from {t_min.isoformat()} to {t_max.isoformat()}",
severity=Severity.Medium,
query_timestamps=query_timestamps
+ [expected_flight.query_timestamp],
)
continue
# TODO: observe flight details
elif t_initiated > t_min:
# If this flight was not observed, there may be propagation latency
pass # TODO: findings propagation latency

def _evaluate_area_too_large_observation(
self,
observer: RIDSystemObserver,
Expand Down Expand Up @@ -882,12 +790,16 @@ def _evaluate_normal_sp_observation(
mappings: Dict[str, TelemetryMapping],
) -> None:

self._evaluate_flight_presence(
_evaluate_flight_presence(
"uss_qualifier, acting as Display Provider",
sp_observation.queries,
False,
mappings,
set(),
self._test_scenario,
self._injected_flights,
self._rid_version,
self._config,
)

for mapping in mappings.values():
Expand Down Expand Up @@ -1312,3 +1224,95 @@ def _sliding_triples(points: List[s2sphere.LatLng]) -> List[List[s2sphere.LatLng
Returns a list of triples of consecutive positions in passed the list.
"""
return [[points[i], points[i + 1], points[i + 2]] for i in range(len(points) - 2)]


def _evaluate_flight_presence(
observer_participant_id: str,
observation_queries: List[Query],
observer_participant_is_relevant: bool,
mapping_by_injection_id: Dict[str, TelemetryMapping],
verified_sps: Set[str],
test_scenario: TestScenario,
injected_flights: List[InjectedFlight],
rid_version: RIDVersion,
evaluation_config: EvaluationConfiguration,
):
"""Implements fragment documented in `display_data_evaluator_flight_presence.md`."""

query_timestamps = [q.request.timestamp for q in observation_queries]
observer_participants = (
[observer_participant_id] if observer_participant_is_relevant else []
)
for expected_flight in injected_flights:
t_initiated = min(q.request.timestamp for q in observation_queries)
t_response = max(q.response.reported.datetime for q in observation_queries)
timestamps = [arrow.get(t.timestamp) for t in expected_flight.flight.telemetry]
t_min = min(timestamps).datetime
t_max = max(timestamps).datetime

if t_response < t_min:
# This flight should definitely not have been observed (it starts in the future)
with test_scenario.check(
"Premature flight", [expected_flight.uss_participant_id]
) as check:
if expected_flight.flight.injection_id in mapping_by_injection_id:
check.record_failed(
summary="Flight observed before it started",
details=f"Flight {expected_flight.flight.injection_id} injected into {expected_flight.uss_participant_id} was observed by {observer_participant_id} at {t_response.isoformat()} before that flight should have started at {t_min.isoformat()}",
severity=Severity.Medium,
query_timestamps=query_timestamps
+ [expected_flight.query_timestamp],
)
# TODO: attempt to observe flight details
continue
elif (
t_response
> t_max
+ rid_version.realtime_period
+ evaluation_config.max_propagation_latency.timedelta
):
# This flight should not have been observed (it was too far in the past)
participants = observer_participants
if (
expected_flight.uss_participant_id not in verified_sps
or not participants
):
participants.append(expected_flight.uss_participant_id)
with test_scenario.check("Lingering flight", participants) as check:
if expected_flight.flight.injection_id in mapping_by_injection_id:
check.record_failed(
summary="Flight still observed long after it ended",
details=f"Flight {expected_flight.flight.injection_id} injected into {expected_flight.uss_participant_id} was observed by {observer_participant_id} at {t_response.isoformat()} after it ended at {t_max.isoformat()}",
severity=Severity.Medium,
query_timestamps=query_timestamps
+ [expected_flight.query_timestamp],
)
continue
elif (
t_min + evaluation_config.max_propagation_latency.timedelta
< t_initiated
< t_max
+ rid_version.realtime_period
- evaluation_config.max_propagation_latency.timedelta
):
# This flight should definitely have been observed
participants = observer_participants
if (
expected_flight.uss_participant_id not in verified_sps
or not participants
):
participants.append(expected_flight.uss_participant_id)
with test_scenario.check("Missing flight", participants) as check:
if expected_flight.flight.injection_id not in mapping_by_injection_id:
check.record_failed(
summary="Expected flight not observed",
details=f"Flight {expected_flight.flight.injection_id} injected into {expected_flight.uss_participant_id} was not found in the observation by {observer_participant_id} at {t_response.isoformat()} even though it should have been active from {t_min.isoformat()} to {t_max.isoformat()}",
severity=Severity.Medium,
query_timestamps=query_timestamps
+ [expected_flight.query_timestamp],
)
continue
# TODO: observe flight details
elif t_initiated > t_min:
# If this flight was not observed, there may be propagation latency
pass # TODO: findings propagation latency

0 comments on commit c245ea8

Please sign in to comment.