Skip to content

Commit

Permalink
[uss_qualifier] dss0030 add subscriptions to the heavy concurrent tra…
Browse files Browse the repository at this point in the history
…ffic scenario
  • Loading branch information
Shastick committed Nov 27, 2023
1 parent 6acdcf5 commit ddd4b5a
Show file tree
Hide file tree
Showing 18 changed files with 1,647 additions and 271 deletions.
6 changes: 6 additions & 0 deletions monitoring/monitorlib/infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def adjust_request_kwargs(self, url, method, kwargs):
return kwargs

async def put_with_headers(self, url, **kwargs):
"""Issues a PUT and returns the status code, headers, and JSON body."""
url = self._prefix_url + url
if "auth" not in kwargs:
kwargs = self.adjust_request_kwargs(url, "PUT", kwargs)
Expand All @@ -201,10 +202,12 @@ async def put_with_headers(self, url, **kwargs):
)

async def put(self, url, **kwargs):
"""Issues a PUT and returns the status code and JSON body."""
(status, _, json) = await self.put_with_headers(url, **kwargs)
return status, json

async def get_with_headers(self, url, **kwargs):
"""Issues a GET and returns the status code, headers, and JSON body."""
url = self._prefix_url + url
if "auth" not in kwargs:
kwargs = self.adjust_request_kwargs(url, "GET", kwargs)
Expand All @@ -216,6 +219,7 @@ async def get_with_headers(self, url, **kwargs):
)

async def get(self, url, **kwargs):
"""Issues a GET and returns the status code and JSON body."""
(status, _, json) = await self.get_with_headers(url, **kwargs)
return status, json

Expand All @@ -227,6 +231,7 @@ async def post(self, url, **kwargs):
return response.status, await response.json()

async def delete_with_headers(self, url, **kwargs):
"""Issues a DELETE and returns the status code, headers, and JSON body."""
url = self._prefix_url + url
if "auth" not in kwargs:
kwargs = self.adjust_request_kwargs(url, "DELETE", kwargs)
Expand All @@ -238,6 +243,7 @@ async def delete_with_headers(self, url, **kwargs):
)

async def delete(self, url, **kwargs):
"""Issues a DELETE and returns the status code and JSON body."""
(status, _, json) = await self.delete_with_headers(url, **kwargs)
return status, json

Expand Down
129 changes: 89 additions & 40 deletions monitoring/monitorlib/mutate/rid.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import datetime
from typing import Dict, List, Optional, Union, Set

from implicitdict import ImplicitDict
import s2sphere
from uas_standards import Operation

from monitoring.monitorlib.fetch.rid import RIDQuery, Subscription, ISA
from monitoring.monitorlib.rid import RIDVersion
from uas_standards.astm.f3411 import v19, v22a
import uas_standards.astm.f3411.v19.api
import uas_standards.astm.f3411.v19.constants
import uas_standards.astm.f3411.v22a.api
import uas_standards.astm.f3411.v22a.constants
import yaml
from implicitdict import ImplicitDict
from uas_standards import Operation
from uas_standards.astm.f3411 import v19, v22a
from yaml.representer import Representer

from monitoring.monitorlib import (
Expand All @@ -21,6 +18,8 @@
rid_v1,
rid_v2,
)
from monitoring.monitorlib.fetch.rid import RIDQuery, Subscription, ISA
from monitoring.monitorlib.rid import RIDVersion


class ChangedSubscription(RIDQuery):
Expand Down Expand Up @@ -94,22 +93,49 @@ def isas(self) -> List[ISA]:
)


def upsert_subscription(
def build_subscription_url(
sub_id: str,
sub_version: Optional[str],
rid_version: RIDVersion,
) -> (Operation, str):
"""
Build the required URL to create, get, update or delete a subscription on a DSS,
in accordance with the specified rid_version and sub_version, if it is available.
Note that for mutations and deletions, sub_version must be provided.
"""
if rid_version == RIDVersion.f3411_19:
if sub_version is None:
op = v19.api.OPERATIONS[v19.api.OperationID.CreateSubscription]
return (op, op.path.format(id=sub_id))
else:
op = v19.api.OPERATIONS[v19.api.OperationID.UpdateSubscription]
return (op, op.path.format(id=sub_id, version=sub_version))
elif rid_version == RIDVersion.f3411_22a:
if sub_version is None:
op = v22a.api.OPERATIONS[v22a.api.OperationID.CreateSubscription]
return (op, op.path.format(id=sub_id))
else:
op = v22a.api.OPERATIONS[v22a.api.OperationID.UpdateSubscription]
return (op, op.path.format(id=sub_id, version=sub_version))
else:
raise NotImplementedError(
f"Cannot build subscription URL for RID version {rid_version}"
)


def build_subscription_payload(
sub_id: str,
area_vertices: List[s2sphere.LatLng],
alt_lo: float,
alt_hi: float,
start_time: Optional[datetime.datetime],
end_time: Optional[datetime.datetime],
uss_base_url: str,
subscription_id: str,
rid_version: RIDVersion,
utm_client: infrastructure.UTMClientSession,
subscription_version: Optional[str] = None,
participant_id: Optional[str] = None,
) -> ChangedSubscription:
mutation = "create" if subscription_version is None else "update"
) -> Dict[str, any]:
if rid_version == RIDVersion.f3411_19:
body = {
return {
"extents": rid_v1.make_volume_4d(
area_vertices,
alt_lo,
Expand All @@ -118,18 +144,57 @@ def upsert_subscription(
end_time,
),
"callbacks": {
"identification_service_area_url": uss_base_url
+ v19.api.OPERATIONS[
v19.api.OperationID.PostIdentificationServiceArea
].path[: -len("/{id}")]
"identification_service_area_url": rid_version.post_isa_url_of(
uss_base_url,
sub_id,
),
},
}
if subscription_version is None:
op = v19.api.OPERATIONS[v19.api.OperationID.CreateSubscription]
url = op.path.format(id=subscription_id)
else:
op = v19.api.OPERATIONS[v19.api.OperationID.UpdateSubscription]
url = op.path.format(id=subscription_id, version=subscription_version)
elif rid_version == RIDVersion.f3411_22a:
return {
"extents": rid_v2.make_volume_4d(
area_vertices,
alt_lo,
alt_hi,
start_time,
end_time,
),
"uss_base_url": uss_base_url,
}
else:
raise NotImplementedError(
f"Cannot upsert subscription using RID version {rid_version}"
)


def upsert_subscription(
area_vertices: List[s2sphere.LatLng],
alt_lo: float,
alt_hi: float,
start_time: Optional[datetime.datetime],
end_time: Optional[datetime.datetime],
uss_base_url: str,
subscription_id: str,
rid_version: RIDVersion,
utm_client: infrastructure.UTMClientSession,
subscription_version: Optional[str] = None,
participant_id: Optional[str] = None,
) -> ChangedSubscription:
mutation = "create" if subscription_version is None else "update"
(op, url) = build_subscription_url(
subscription_id, subscription_version, rid_version
)
body = build_subscription_payload(
subscription_id,
area_vertices,
alt_lo,
alt_hi,
start_time,
end_time,
uss_base_url,
rid_version,
)
if rid_version == RIDVersion.f3411_19:
return ChangedSubscription(
mutation=mutation,
v19_query=fetch.query_and_describe(
Expand All @@ -142,22 +207,6 @@ def upsert_subscription(
),
)
elif rid_version == RIDVersion.f3411_22a:
body = {
"extents": rid_v2.make_volume_4d(
area_vertices,
alt_lo,
alt_hi,
start_time,
end_time,
),
"uss_base_url": uss_base_url,
}
if subscription_version is None:
op = v22a.api.OPERATIONS[v22a.api.OperationID.CreateSubscription]
url = op.path.format(id=subscription_id)
else:
op = v22a.api.OPERATIONS[v22a.api.OperationID.UpdateSubscription]
url = op.path.format(id=subscription_id, version=subscription_version)
return ChangedSubscription(
mutation=mutation,
v22a_query=fetch.query_and_describe(
Expand Down
57 changes: 54 additions & 3 deletions monitoring/monitorlib/rid.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from enum import Enum

import arrow

from monitoring.monitorlib import schema_validation
from uas_standards.astm.f3411 import v19, v22a
import uas_standards.astm.f3411.v19.api
import uas_standards.astm.f3411.v19.constants
import uas_standards.astm.f3411.v22a.api
import uas_standards.astm.f3411.v22a.constants
from uas_standards.astm.f3411 import v19, v22a

from monitoring.monitorlib import schema_validation


class RIDVersion(str, Enum):
Expand Down Expand Up @@ -84,6 +84,42 @@ def openapi_delete_isa_response_path(self) -> str:
else:
raise ValueError(f"Unsupported RID version '{self}'")

@property
def openapi_get_subscription_response_path(self) -> str:
if self == RIDVersion.f3411_19:
return schema_validation.F3411_19.GetSubscriptionResponse
elif self == RIDVersion.f3411_22a:
return schema_validation.F3411_22a.GetSubscriptionResponse
else:
raise ValueError(f"Unsupported RID version '{self}'")

@property
def openapi_put_subscription_response_path(self) -> str:
if self == RIDVersion.f3411_19:
return schema_validation.F3411_19.PutSubscriptionResponse
elif self == RIDVersion.f3411_22a:
return schema_validation.F3411_22a.PutSubscriptionResponse
else:
raise ValueError(f"Unsupported RID version '{self}'")

@property
def openapi_delete_subscription_response_path(self) -> str:
if self == RIDVersion.f3411_19:
return schema_validation.F3411_19.DeleteSubscriptionResponse
elif self == RIDVersion.f3411_22a:
return schema_validation.F3411_22a.DeleteSubscriptionResponse
else:
raise ValueError(f"Unsupported RID version '{self}'")

@property
def openapi_search_subscriptions_response_path(self) -> str:
if self == RIDVersion.f3411_19:
return schema_validation.F3411_19.SearchSubscriptionsResponse
elif self == RIDVersion.f3411_22a:
return schema_validation.F3411_22a.SearchSubscriptionsResponse
else:
raise ValueError(f"Unsupported RID version '{self}'")

@property
def realtime_period(self) -> timedelta:
if self == RIDVersion.f3411_19:
Expand Down Expand Up @@ -237,3 +273,18 @@ def flights_url_of(self, base_url: str) -> str:
return base_url + flights_path
else:
raise ValueError("Unsupported RID version '{}'".format(self))

def post_isa_url_of(self, base_url: str, sub_id: str) -> str:
if self == RIDVersion.f3411_19:
isa_path = v19.api.OPERATIONS[
v19.api.OperationID.PostIdentificationServiceArea
].path[: -len("/{id}")]
return base_url + isa_path
elif self == RIDVersion.f3411_22a:
# TODO: Urls returned by the DSS contain the ID, confirm this is as expected
isa_path = v22a.api.OPERATIONS[
v22a.api.OperationID.PostIdentificationServiceArea
].path.format(id=sub_id)
return base_url + isa_path
else:
raise ValueError("Unsupported RID version '{}'".format(self))
8 changes: 8 additions & 0 deletions monitoring/monitorlib/schema_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class F3411_19(str, Enum):
DeleteIdentificationServiceAreaResponse = (
"components.schemas.DeleteIdentificationServiceAreaResponse"
)
GetSubscriptionResponse = "components.schemas.GetSubscriptionResponse"
PutSubscriptionResponse = "components.schemas.PutSubscriptionResponse"
DeleteSubscriptionResponse = "components.schemas.DeleteSubscriptionResponse"
SearchSubscriptionsResponse = "components.schemas.SearchSubscriptionsResponse"


class F3411_22a(str, Enum):
Expand All @@ -46,6 +50,10 @@ class F3411_22a(str, Enum):
DeleteIdentificationServiceAreaResponse = (
"components.schemas.DeleteIdentificationServiceAreaResponse"
)
GetSubscriptionResponse = "components.schemas.GetSubscriptionResponse"
PutSubscriptionResponse = "components.schemas.PutSubscriptionResponse"
DeleteSubscriptionResponse = "components.schemas.DeleteSubscriptionResponse"
SearchSubscriptionsResponse = "components.schemas.SearchSubscriptionsResponse"


class F3548_21(str, Enum):
Expand Down
2 changes: 1 addition & 1 deletion monitoring/prober/infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def wrapper_default_scope(*args, **kwargs):
resource_type_code_descriptions: Dict[ResourceType, str] = {}


# Next code: 374
# Next code: 375
def register_resource_type(code: int, description: str) -> ResourceType:
"""Register that the specified code refers to the described resource.
Expand Down
Loading

0 comments on commit ddd4b5a

Please sign in to comment.