Skip to content

Commit

Permalink
Fix DataRequestHandler after recent client changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertbartel committed Jan 24, 2024
1 parent 0d7b7cd commit edd5680
Showing 1 changed file with 42 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dmod.communication.dataset_management_message import MaaSDatasetManagementMessage, MaaSDatasetManagementResponse, \
ManagementAction
from dmod.communication.data_transmit_message import DataTransmitMessage, DataTransmitResponse
from dmod.core.exception import DmodRuntimeError
from pathlib import Path
from typing import Optional, Tuple

Expand Down Expand Up @@ -269,68 +270,68 @@ async def determine_required_access_types(self, request: MaaSDatasetManagementMe
# FIXME: for now, just use the default type (which happens to be "everything")
return self._default_required_access_type,

async def _handle_data_download(self, client_websocket, service_websocket) -> MaaSDatasetManagementResponse:
async def _handle_data_download(self, download_request: MaaSDatasetManagementMessage, client_websocket) -> MaaSDatasetManagementResponse:
series_uuid = None
# This might be data transmission, or it might be a management response message
possible_responses = [MaaSDatasetManagementResponse, DataTransmitMessage]
service_response = self.service_client.async_make_request(download_request, possible_responses)
while True:
# This might be data transmission, or it might be a management response message
raw_service_response = await service_websocket.recv()
service_response_json = json.loads(raw_service_response)
mgmt_response = MaaSDatasetManagementResponse.factory_init_from_deserialized_json(service_response_json)
if mgmt_response is not None:
return mgmt_response
data_transmit_msg = DataTransmitMessage.factory_init_from_deserialized_json(service_response_json)
if isinstance(service_response, MaaSDatasetManagementResponse):
return service_response

assert isinstance(service_response, DataTransmitMessage)

if series_uuid is None:
series_uuid = data_transmit_msg.series_uuid
elif data_transmit_msg.series_uuid != series_uuid:
raise RuntimeError("Data series UUID for data transmit does not match expected.")
await client_websocket.send(raw_service_response)
series_uuid = service_response.series_uuid
elif service_response.series_uuid != series_uuid:
raise DmodRuntimeError("Data series UUID for data transmit does not match expected.")

await client_websocket.send(str(service_response))
raw_client_response = await client_websocket.recv()
data_response = DataTransmitResponse.factory_init_from_deserialized_json(json.loads(raw_client_response))
if data_response.series_uuid != series_uuid:
raise RuntimeError("Data series UUID for data receipt does not match expected.")
await service_websocket.send(raw_client_response)
service_response = self.service_client.async_make_request(data_response, possible_responses)

async def _handle_data_upload(self, client_websocket, service_websocket) -> MaaSDatasetManagementResponse:
async def _handle_data_upload(self, upload_request: MaaSDatasetManagementMessage, client_websocket, service_websocket) -> MaaSDatasetManagementResponse:
series_uuid = None
# This might be DataTransmitResponse, or it might be a management response message
possible_responses = [MaaSDatasetManagementResponse, DataTransmitResponse]
service_response = self.service_client.async_make_request(upload_request, possible_responses)
while True:
# Await a DataTransmitResponse with success indicating ready to receive
# TODO: update Data service to do this
raw_service_response = await service_websocket.recv()
service_response_json = json.loads(raw_service_response)
mgmt_response = MaaSDatasetManagementResponse.factory_init_from_deserialized_json(service_response_json)
if mgmt_response is not None:
return mgmt_response
data_transmit_response = DataTransmitResponse.factory_init_from_deserialized_json(service_response_json)
if isinstance(service_response, MaaSDatasetManagementResponse):
return service_response

assert isinstance(service_response, DataTransmitResponse)

if series_uuid is None:
series_uuid = data_transmit_response.series_uuid
elif data_transmit_response.series_uuid != series_uuid:
raise RuntimeError("Data series UUID for data upload response does not match expected.")
await client_websocket.send(raw_service_response)
series_uuid = service_response.series_uuid
elif service_response.series_uuid != series_uuid:
raise DmodRuntimeError("Data series UUID for data upload response does not match expected.")

await client_websocket.send(str(service_response))
raw_client_response = await client_websocket.recv()
data_transmit_msg = DataTransmitMessage.factory_init_from_deserialized_json(json.loads(raw_client_response))
if data_transmit_msg.series_uuid != series_uuid:
raise RuntimeError("Data series UUID for data upload transmit does not match expected.")
await service_websocket.send(raw_client_response)
raise RuntimeError("Data series UUID for data upload does not match expected.")
service_response = self.service_client.async_make_request(data_transmit_msg, possible_responses)

async def handle_request(self, request: MaaSDatasetManagementMessage, **kwargs) -> MaaSDatasetManagementResponse:
# Need receiver websocket (i.e. DMOD client side) as kwarg
session, is_authorized, reason, msg = await self.get_authorized_session(request)
if not is_authorized:
return MaaSDatasetManagementResponse(success=False, reason=reason.name, message=msg)
# In this case, we actually can pass the request as-is straight through (i.e., after confirming authorization)
async with self.service_client as client:
# Have to handle these two slightly differently, since multiple message will be going over the websocket
if request.management_action == ManagementAction.REQUEST_DATA:
await client.connection.send(str(request))
mgmt_response = await self._handle_data_download(client_websocket=kwargs['upstream_websocket'],
service_websocket=client.connection)
elif request.management_action == ManagementAction.ADD_DATA:
await client.connection.send(str(request))
mgmt_response = await self._handle_data_upload(client_websocket=kwargs['upstream_websocket'],
service_websocket=client.connection)
else:
mgmt_response = await client.async_make_request(request)
logging.debug("************* {} received response:\n{}".format(self.__class__.__name__, str(mgmt_response)))
# Have to handle these two slightly differently, since multiple message will be going over the websocket
if request.management_action == ManagementAction.REQUEST_DATA:
mgmt_response = await self._handle_data_download(download_request=request,
client_websocket=kwargs['upstream_websocket'])
elif request.management_action == ManagementAction.ADD_DATA:
mgmt_response = await self._handle_data_upload(upload_request=request,
client_websocket=kwargs['upstream_websocket'])
else:
mgmt_response = await self.service_client.async_make_request(request)
logging.debug("************* {} received response:\n{}".format(self.__class__.__name__, str(mgmt_response)))
# Likewise, can just send back the response from the internal service client
return MaaSDatasetManagementResponse.factory_create(mgmt_response)

Expand Down

0 comments on commit edd5680

Please sign in to comment.