From 67eb8ca44e83b581a0cce7ff65574a54204d8db8 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 23 Feb 2024 12:03:31 -0500 Subject: [PATCH] Update and refactor request_clients.py functions. Updating newly implemented job query and control functions in JobClient to account fully for failure scenarios, and refactoring to centralize mostly duplicate routines that lent themselves well to shared parameterized functions. --- .../lib/client/dmod/client/request_clients.py | 96 +++++++++++++++---- 1 file changed, 80 insertions(+), 16 deletions(-) diff --git a/python/lib/client/dmod/client/request_clients.py b/python/lib/client/dmod/client/request_clients.py index 2d34d2ef7..26d824d53 100644 --- a/python/lib/client/dmod/client/request_clients.py +++ b/python/lib/client/dmod/client/request_clients.py @@ -27,6 +27,67 @@ def __init__(self, transport_client: TransportLayerClient, auth_client: AuthClie self._transport_client: TransportLayerClient = transport_client self._auth_client: AuthClient = auth_client + async def _job_control_request(self, job_id, action: JobControlAction) -> JobControlResponse: + """ + Helper function for centralizing/parameterizing different job control actions possible via the public interface. + + Parameters + ---------- + job_id + The job to request action on. + action + The type of action to request. + + Returns + ------- + JobControlResponse + A response object indicating success or failure of the action request, as well as relevant details. + """ + raw_response = None + try: + raw_response = await self._submit_job_request(JobControlRequest(job_id=job_id, action=action)) + response = JobControlResponse.factory_init_from_deserialized_json(json.loads(raw_response)) + if response is not None: + return response + else: + return JobControlResponse(job_id=job_id, action=action, success=False, + reason="Response Deserializing Failed", data={"raw_response": raw_response}, + message="Unable to deserialize JSON to response object") + except Exception as e: + return JobControlResponse(job_id=job_id, action=action, success=False, reason=e.__class__.__name__, + message=str(e), data={"raw_response": raw_response} if raw_response else None) + + async def _job_info_request(self, job_id: str, status_only: bool) -> JobInfoResponse: + """ + Single central helper function for handling job info request scenarios supported by the public interface. + + Parameters + ---------- + job_id + The job for which to request state details. + status_only + Whether only the job's status is being requested, as opposed to the full state of the job object. + + Returns + ------- + JobInfoResponse + A response object indicating success or failure and containing the requested state info when successful. + """ + raw_response = None + try: + raw_response = await self._submit_job_request(request=JobInfoRequest(job_id=job_id, + status_only=status_only)) + response = JobInfoResponse.factory_init_from_deserialized_json(json.loads(raw_response)) + if response is not None: + return response + else: + return JobInfoResponse(job_id=job_id, status_only=status_only, success=False, + reason="Response Deserializing Failed", data={"raw_response": raw_response}, + message=f"Unable to deserialize JSON to {JobInfoResponse.__class__.__name__}") + except Exception as e: + return JobInfoResponse(job_id=job_id, status_only=status_only, success=False, reason=e.__class__.__name__, + message=str(e), data={"raw_response": raw_response} if raw_response else None) + async def _submit_job_request(self, request) -> str: if await self._auth_client.apply_auth(request): # Some clients may be async context managers @@ -40,7 +101,7 @@ async def _submit_job_request(self, request) -> str: else: msg = f"{self.__class__.__name__} could not use {self._auth_client.__class__.__name__} to authenticate " \ f"{request.__class__.__name__}" - raise RuntimeError(msg) + raise DmodRuntimeError(msg) async def get_jobs_list(self, active_only: bool) -> List[str]: """ @@ -93,8 +154,7 @@ async def request_job_info(self, job_id: str, *args, **kwargs) -> JobInfoRespons An indicator of success of the request that, when successful, contains the full state of the provided job, formatted as a JSON dictionary, in the ``data`` attribute. """ - return JobInfoResponse.factory_init_from_deserialized_json( - json.loads(await self._submit_job_request(request=JobInfoRequest(job_id=job_id)))) + return await self._job_info_request(job_id=job_id, status_only=False) async def request_job_release(self, job_id: str, *args, **kwargs) -> JobControlResponse: """ @@ -114,9 +174,7 @@ async def request_job_release(self, job_id: str, *args, **kwargs) -> JobControlR JobControlResponse An indicator of whether there had been allocated resources for the job, all of which are now released. """ - return JobControlResponse.factory_init_from_deserialized_json( - json.loads( - await self._submit_job_request(JobControlRequest(job_id=job_id, action=JobControlAction.RELEASE)))) + return await self._job_control_request(job_id=job_id, action=JobControlAction.RELEASE) async def request_job_restart(self, job_id: str, *args, **kwargs) -> JobControlResponse: """ @@ -136,9 +194,7 @@ async def request_job_restart(self, job_id: str, *args, **kwargs) -> JobControlR JobControlResponse An indicator of whether the job was restarted as requested. """ - return JobControlResponse.factory_init_from_deserialized_json( - json.loads( - await self._submit_job_request(JobControlRequest(job_id=job_id, action=JobControlAction.RESTART)))) + return await self._job_control_request(job_id=job_id, action=JobControlAction.RESTART) async def request_job_status(self, job_id: str, *args, **kwargs) -> JobInfoResponse: """ @@ -161,8 +217,7 @@ async def request_job_status(self, job_id: str, *args, **kwargs) -> JobInfoRespo JobInfoResponse An indicator that, when successful, includes as ``data`` the serialized status of the provided job. """ - return JobInfoResponse.factory_init_from_deserialized_json( - json.loads(await self._submit_job_request(request=JobInfoRequest(job_id=job_id, status_only=True)))) + return await self._job_info_request(job_id=job_id, status_only=True) async def request_job_stop(self, job_id: str, *args, **kwargs) -> JobControlResponse: """ @@ -182,9 +237,7 @@ async def request_job_stop(self, job_id: str, *args, **kwargs) -> JobControlResp JobControlResponse An indicator of whether the job was stopped as requested. """ - return JobControlResponse.factory_init_from_deserialized_json( - json.loads( - await self._submit_job_request(JobControlRequest(job_id=job_id, action=JobControlAction.STOP)))) + return await self._job_control_request(job_id=job_id, action=JobControlAction.STOP) async def request_jobs_list(self, jobs_list_active_only: bool, *args, **kwargs) -> JobListResponse: """ @@ -210,8 +263,19 @@ async def request_jobs_list(self, jobs_list_active_only: bool, *args, **kwargs) ------- get_jobs_list """ - return JobListResponse.factory_init_from_deserialized_json( - json.loads(await self._submit_job_request(JobListRequest(only_active=jobs_list_active_only)))) + raw_response = None + try: + raw_response = await self._submit_job_request(request=JobListRequest(only_active=jobs_list_active_only)) + response = JobListResponse.factory_init_from_deserialized_json(json.loads(raw_response)) + if response is not None: + return response + else: + return JobListResponse(only_active=jobs_list_active_only, success=False, + reason="Response Deserializing Failed", data={"raw_response": raw_response}, + message=f"Unable to deserialize JSON to {JobInfoResponse.__class__.__name__}") + except Exception as e: + return JobListResponse(only_active=jobs_list_active_only, success=False, reason=e.__class__.__name__, + message=str(e), data={"raw_response": raw_response} if raw_response else None) async def submit_ngen_request(self, **kwargs) -> NGENRequestResponse: return NGENRequestResponse.factory_init_from_deserialized_json(