Skip to content

Commit

Permalink
Update and refactor request_clients.py functions.
Browse files Browse the repository at this point in the history
Updating newly implemented job query and control functions in JobClient
to account fully for failure scenarios, and refactoring to centralize
mostly duplicate routines that lent themselves well to shared
parameterized functions.
  • Loading branch information
robertbartel committed Feb 23, 2024
1 parent 29275e1 commit 67eb8ca
Showing 1 changed file with 80 additions and 16 deletions.
96 changes: 80 additions & 16 deletions python/lib/client/dmod/client/request_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,67 @@ def __init__(self, transport_client: TransportLayerClient, auth_client: AuthClie
self._transport_client: TransportLayerClient = transport_client
self._auth_client: AuthClient = auth_client

async def _job_control_request(self, job_id, action: JobControlAction) -> JobControlResponse:
"""
Helper function for centralizing/parameterizing different job control actions possible via the public interface.
Parameters
----------
job_id
The job to request action on.
action
The type of action to request.
Returns
-------
JobControlResponse
A response object indicating success or failure of the action request, as well as relevant details.
"""
raw_response = None
try:
raw_response = await self._submit_job_request(JobControlRequest(job_id=job_id, action=action))
response = JobControlResponse.factory_init_from_deserialized_json(json.loads(raw_response))
if response is not None:
return response
else:
return JobControlResponse(job_id=job_id, action=action, success=False,
reason="Response Deserializing Failed", data={"raw_response": raw_response},
message="Unable to deserialize JSON to response object")
except Exception as e:
return JobControlResponse(job_id=job_id, action=action, success=False, reason=e.__class__.__name__,
message=str(e), data={"raw_response": raw_response} if raw_response else None)

async def _job_info_request(self, job_id: str, status_only: bool) -> JobInfoResponse:
"""
Single central helper function for handling job info request scenarios supported by the public interface.
Parameters
----------
job_id
The job for which to request state details.
status_only
Whether only the job's status is being requested, as opposed to the full state of the job object.
Returns
-------
JobInfoResponse
A response object indicating success or failure and containing the requested state info when successful.
"""
raw_response = None
try:
raw_response = await self._submit_job_request(request=JobInfoRequest(job_id=job_id,
status_only=status_only))
response = JobInfoResponse.factory_init_from_deserialized_json(json.loads(raw_response))
if response is not None:
return response
else:
return JobInfoResponse(job_id=job_id, status_only=status_only, success=False,
reason="Response Deserializing Failed", data={"raw_response": raw_response},
message=f"Unable to deserialize JSON to {JobInfoResponse.__class__.__name__}")
except Exception as e:
return JobInfoResponse(job_id=job_id, status_only=status_only, success=False, reason=e.__class__.__name__,
message=str(e), data={"raw_response": raw_response} if raw_response else None)

async def _submit_job_request(self, request) -> str:
if await self._auth_client.apply_auth(request):
# Some clients may be async context managers
Expand All @@ -40,7 +101,7 @@ async def _submit_job_request(self, request) -> str:
else:
msg = f"{self.__class__.__name__} could not use {self._auth_client.__class__.__name__} to authenticate " \
f"{request.__class__.__name__}"
raise RuntimeError(msg)
raise DmodRuntimeError(msg)

async def get_jobs_list(self, active_only: bool) -> List[str]:
"""
Expand Down Expand Up @@ -93,8 +154,7 @@ async def request_job_info(self, job_id: str, *args, **kwargs) -> JobInfoRespons
An indicator of success of the request that, when successful, contains the full state of the provided job,
formatted as a JSON dictionary, in the ``data`` attribute.
"""
return JobInfoResponse.factory_init_from_deserialized_json(
json.loads(await self._submit_job_request(request=JobInfoRequest(job_id=job_id))))
return await self._job_info_request(job_id=job_id, status_only=False)

async def request_job_release(self, job_id: str, *args, **kwargs) -> JobControlResponse:
"""
Expand All @@ -114,9 +174,7 @@ async def request_job_release(self, job_id: str, *args, **kwargs) -> JobControlR
JobControlResponse
An indicator of whether there had been allocated resources for the job, all of which are now released.
"""
return JobControlResponse.factory_init_from_deserialized_json(
json.loads(
await self._submit_job_request(JobControlRequest(job_id=job_id, action=JobControlAction.RELEASE))))
return await self._job_control_request(job_id=job_id, action=JobControlAction.RELEASE)

async def request_job_restart(self, job_id: str, *args, **kwargs) -> JobControlResponse:
"""
Expand All @@ -136,9 +194,7 @@ async def request_job_restart(self, job_id: str, *args, **kwargs) -> JobControlR
JobControlResponse
An indicator of whether the job was restarted as requested.
"""
return JobControlResponse.factory_init_from_deserialized_json(
json.loads(
await self._submit_job_request(JobControlRequest(job_id=job_id, action=JobControlAction.RESTART))))
return await self._job_control_request(job_id=job_id, action=JobControlAction.RESTART)

async def request_job_status(self, job_id: str, *args, **kwargs) -> JobInfoResponse:
"""
Expand All @@ -161,8 +217,7 @@ async def request_job_status(self, job_id: str, *args, **kwargs) -> JobInfoRespo
JobInfoResponse
An indicator that, when successful, includes as ``data`` the serialized status of the provided job.
"""
return JobInfoResponse.factory_init_from_deserialized_json(
json.loads(await self._submit_job_request(request=JobInfoRequest(job_id=job_id, status_only=True))))
return await self._job_info_request(job_id=job_id, status_only=True)

async def request_job_stop(self, job_id: str, *args, **kwargs) -> JobControlResponse:
"""
Expand All @@ -182,9 +237,7 @@ async def request_job_stop(self, job_id: str, *args, **kwargs) -> JobControlResp
JobControlResponse
An indicator of whether the job was stopped as requested.
"""
return JobControlResponse.factory_init_from_deserialized_json(
json.loads(
await self._submit_job_request(JobControlRequest(job_id=job_id, action=JobControlAction.STOP))))
return await self._job_control_request(job_id=job_id, action=JobControlAction.STOP)

async def request_jobs_list(self, jobs_list_active_only: bool, *args, **kwargs) -> JobListResponse:
"""
Expand All @@ -210,8 +263,19 @@ async def request_jobs_list(self, jobs_list_active_only: bool, *args, **kwargs)
-------
get_jobs_list
"""
return JobListResponse.factory_init_from_deserialized_json(
json.loads(await self._submit_job_request(JobListRequest(only_active=jobs_list_active_only))))
raw_response = None
try:
raw_response = await self._submit_job_request(request=JobListRequest(only_active=jobs_list_active_only))
response = JobListResponse.factory_init_from_deserialized_json(json.loads(raw_response))
if response is not None:
return response
else:
return JobListResponse(only_active=jobs_list_active_only, success=False,
reason="Response Deserializing Failed", data={"raw_response": raw_response},
message=f"Unable to deserialize JSON to {JobInfoResponse.__class__.__name__}")
except Exception as e:
return JobListResponse(only_active=jobs_list_active_only, success=False, reason=e.__class__.__name__,
message=str(e), data={"raw_response": raw_response} if raw_response else None)

async def submit_ngen_request(self, **kwargs) -> NGENRequestResponse:
return NGENRequestResponse.factory_init_from_deserialized_json(
Expand Down

0 comments on commit 67eb8ca

Please sign in to comment.