Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finish CLI function implementations #525

Merged
merged 43 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
4c5fefc
Adjust event type docs and Response superclass.
robertbartel Feb 16, 2024
e058eb1
Add new job query and control external requests.
robertbartel Feb 16, 2024
9835838
Bump communication version to 0.17.0.
robertbartel Feb 16, 2024
69cca38
Bump scheduler service comms dep to 0.17.0.
robertbartel Feb 16, 2024
f82c441
Add scheduler service funcs for job control/info.
robertbartel Feb 16, 2024
51588b6
Update schedulerservice listener for new job msgs.
robertbartel Feb 16, 2024
d3456fa
Bump scheduler service version to 0.9.0.
robertbartel Feb 16, 2024
92d2a71
Add new job status steps related to stopping.
robertbartel Feb 16, 2024
9a1bae5
Update JobImpl.should_release_resources func.
robertbartel Feb 16, 2024
e545d5a
Update launcher to stop/remove job.
robertbartel Feb 20, 2024
2dc1042
Refactor RedisBackedJobUtil.get_all_active_jobs.
robertbartel Feb 20, 2024
d18c1a1
Add get_job_ids to JobUtil interface.
robertbartel Feb 20, 2024
b5b0ed2
Modify release_allocations for JobManager.
robertbartel Feb 20, 2024
870613c
Add funcs to job manager for stop and restart.
robertbartel Feb 20, 2024
35914e5
Bump scheduler version to 0.11.0.
robertbartel Feb 20, 2024
a615dfd
Bump schedulerservice dep for scheduler.
robertbartel Feb 20, 2024
901cfac
Bump requestservice communication dep.
robertbartel Feb 20, 2024
004cb7d
Refactor SchedulerHandler class method name.
robertbartel Feb 21, 2024
febbb97
Add external request handler for existing job msg.
robertbartel Feb 21, 2024
2319d16
Add ExistingJobRequestHandler to package __init__.
robertbartel Feb 21, 2024
d2c5e38
Update externalrequest core and comms deps.
robertbartel Feb 21, 2024
decf61c
Bump externalrequest version to 0.6.0.
robertbartel Feb 21, 2024
ce848db
Update requestservice externalrequests dep.
robertbartel Feb 21, 2024
6f01afa
Support existing job messages in requestservice.
robertbartel Feb 21, 2024
d47cf5a
Bump requestservice version to 0.8.0.
robertbartel Feb 21, 2024
ccc2b5f
Update client comms and externreqs deps.
robertbartel Feb 21, 2024
cf71737
Update client JobClient with job-related funcs.
robertbartel Feb 21, 2024
45ee38b
Bump client version to 0.5.0.
robertbartel Feb 21, 2024
cf460a4
Correct forward lookup issue in scheduler.py.
robertbartel Feb 21, 2024
94203be
Support Serializable BasicResultsIndicator.data.
robertbartel Feb 21, 2024
42c4bcf
Fix deprecated attribute "set"s in test file.
robertbartel Feb 22, 2024
1367baa
Update tests for changes to release_allocations.
robertbartel Feb 22, 2024
8b600fe
Fix deprecated job_id set in job_manager.py.
robertbartel Feb 22, 2024
578b9a3
Rename func to JobClient.request_job_restart.
robertbartel Feb 22, 2024
0a414f2
Remove wildcard import in maas_request_handlers.py
robertbartel Feb 22, 2024
5822e5f
Remove wildcard input in request_clients.py.
robertbartel Feb 22, 2024
0c2e1c0
Remove wildcard input in requestservice service.py
robertbartel Feb 22, 2024
bb163e1
Update and refactor request_clients.py functions.
robertbartel Feb 23, 2024
a2c0a47
Update python/lib/communication/dmod/communication/maas_request/job_m…
robertbartel Feb 23, 2024
009f503
Update python/lib/communication/dmod/communication/maas_request/job_m…
robertbartel Feb 23, 2024
2577a32
Update python/lib/communication/dmod/communication/maas_request/job_m…
robertbartel Feb 23, 2024
1eddbf0
Update python/lib/externalrequests/dmod/externalrequests/maas_request…
robertbartel Feb 23, 2024
3c38a17
Correcting apparent merge/rebase error.
robertbartel Mar 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/lib/client/dmod/client/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.4.3'
__version__ = '0.5.0'
138 changes: 114 additions & 24 deletions python/lib/client/dmod/client/request_clients.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to note that the additions to the module are super useful! Looking forward to using these APIs, thanks @robertbartel!

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from dmod.communication.dataset_management_message import DatasetManagementMessage, DatasetManagementResponse, \
MaaSDatasetManagementMessage, MaaSDatasetManagementResponse, QueryType, DatasetQuery
from dmod.communication.data_transmit_message import DataTransmitMessage, DataTransmitResponse
from dmod.communication.maas_request.job_message import (JobControlAction, JobControlRequest, JobControlResponse,
JobInfoRequest, JobInfoResponse, JobListRequest,
JobListResponse)
from dmod.core.exception import DmodRuntimeError
from dmod.core.meta_data import DataCategory, DataDomain
from dmod.core.serializable import BasicResultIndicator, ResultIndicator
Expand All @@ -24,6 +27,67 @@ def __init__(self, transport_client: TransportLayerClient, auth_client: AuthClie
self._transport_client: TransportLayerClient = transport_client
self._auth_client: AuthClient = auth_client

async def _job_control_request(self, job_id, action: JobControlAction) -> JobControlResponse:
"""
Helper function for centralizing/parameterizing different job control actions possible via the public interface.

Parameters
----------
job_id
The job to request action on.
action
The type of action to request.

Returns
-------
JobControlResponse
A response object indicating success or failure of the action request, as well as relevant details.
"""
raw_response = None
try:
raw_response = await self._submit_job_request(JobControlRequest(job_id=job_id, action=action))
response = JobControlResponse.factory_init_from_deserialized_json(json.loads(raw_response))
if response is not None:
return response
else:
return JobControlResponse(job_id=job_id, action=action, success=False,
reason="Response Deserializing Failed", data={"raw_response": raw_response},
message="Unable to deserialize JSON to response object")
except Exception as e:
return JobControlResponse(job_id=job_id, action=action, success=False, reason=e.__class__.__name__,
message=str(e), data={"raw_response": raw_response} if raw_response else None)

async def _job_info_request(self, job_id: str, status_only: bool) -> JobInfoResponse:
"""
Single central helper function for handling job info request scenarios supported by the public interface.

Parameters
----------
job_id
The job for which to request state details.
status_only
Whether only the job's status is being requested, as opposed to the full state of the job object.

Returns
-------
JobInfoResponse
A response object indicating success or failure and containing the requested state info when successful.
"""
raw_response = None
try:
raw_response = await self._submit_job_request(request=JobInfoRequest(job_id=job_id,
status_only=status_only))
response = JobInfoResponse.factory_init_from_deserialized_json(json.loads(raw_response))
if response is not None:
return response
else:
return JobInfoResponse(job_id=job_id, status_only=status_only, success=False,
reason="Response Deserializing Failed", data={"raw_response": raw_response},
message=f"Unable to deserialize JSON to {JobInfoResponse.__class__.__name__}")
except Exception as e:
return JobInfoResponse(job_id=job_id, status_only=status_only, success=False, reason=e.__class__.__name__,
message=str(e), data={"raw_response": raw_response} if raw_response else None)

async def _submit_job_request(self, request) -> str:
if await self._auth_client.apply_auth(request):
# Some clients may be async context managers
Expand All @@ -37,7 +101,7 @@ async def _submit_job_request(self, request) -> str:
else:
msg = f"{self.__class__.__name__} could not use {self._auth_client.__class__.__name__} to authenticate " \
f"{request.__class__.__name__}"
raise RuntimeError(msg)
raise DmodRuntimeError(msg)

async def get_jobs_list(self, active_only: bool) -> List[str]:
"""
Expand Down Expand Up @@ -71,8 +135,7 @@ async def get_jobs_list(self, active_only: bool) -> List[str]:
else:
return indicator.data

# TODO: this is going to need some adjustments to the type hinting
async def request_job_info(self, job_id: str, *args, **kwargs) -> ResultIndicator:
async def request_job_info(self, job_id: str, *args, **kwargs) -> JobInfoResponse:
"""
Request the full state of the provided job, formatted as a JSON dictionary.

Expand All @@ -87,14 +150,13 @@ async def request_job_info(self, job_id: str, *args, **kwargs) -> ResultIndicato

Returns
-------
ResultIndicator
An indicator of success of the request that, when successful, contains he full state of the provided job,
JobInfoResponse
An indicator of success of the request that, when successful, contains the full state of the provided job,
formatted as a JSON dictionary, in the ``data`` attribute.
"""
# TODO: implement
raise NotImplementedError('{} function "request_job_info" not implemented yet'.format(self.__class__.__name__))
return await self._job_info_request(job_id=job_id, status_only=False)

async def request_job_release(self, job_id: str, *args, **kwargs) -> ResultIndicator:
async def request_job_release(self, job_id: str, *args, **kwargs) -> JobControlResponse:
"""
Request the allocated resources for the provided job be released.

Expand All @@ -109,13 +171,32 @@ async def request_job_release(self, job_id: str, *args, **kwargs) -> ResultIndic

Returns
-------
ResultIndicator
JobControlResponse
An indicator of whether there had been allocated resources for the job, all of which are now released.
"""
# TODO: implement
raise NotImplementedError('{} function "request_job_release" not implemented yet'.format(self.__class__.__name__))
return await self._job_control_request(job_id=job_id, action=JobControlAction.RELEASE)

async def request_job_status(self, job_id: str, *args, **kwargs) -> BasicResultIndicator:
async def request_job_restart(self, job_id: str, *args, **kwargs) -> JobControlResponse:
"""
Request a job - expected be stopped - be resumed; i.e., transitioned from ``STOPPED`` to ``RUNNING` exec step.

Parameters
----------
job_id : str
The id of the job in question.
args
(Unused) variable positional args.
kwargs
(Unused) variable keyword args.

Returns
-------
JobControlResponse
An indicator of whether the job was restarted as requested.
"""
return await self._job_control_request(job_id=job_id, action=JobControlAction.RESTART)

async def request_job_status(self, job_id: str, *args, **kwargs) -> JobInfoResponse:
"""
Request the status of the provided job.

Expand All @@ -133,13 +214,12 @@ async def request_job_status(self, job_id: str, *args, **kwargs) -> BasicResultI

Returns
-------
BasicResultIndicator
An indicator that, when successful, includes as ``data`` the serialized status string of the provided job.
JobInfoResponse
An indicator that, when successful, includes as ``data`` the serialized status of the provided job.
"""
# TODO: implement
raise NotImplementedError('{} function "request_job_status" not implemented yet'.format(self.__class__.__name__))
return await self._job_info_request(job_id=job_id, status_only=True)

async def request_job_stop(self, job_id: str, *args, **kwargs) -> ResultIndicator:
async def request_job_stop(self, job_id: str, *args, **kwargs) -> JobControlResponse:
"""
Request the provided job be stopped; i.e., transitioned to the ``STOPPED`` exec step.

Expand All @@ -154,13 +234,12 @@ async def request_job_stop(self, job_id: str, *args, **kwargs) -> ResultIndicato

Returns
-------
ResultIndicator
JobControlResponse
An indicator of whether the job was stopped as requested.
"""
# TODO: implement
raise NotImplementedError('{} function "request_job_stop" not implemented yet'.format(self.__class__.__name__))
return await self._job_control_request(job_id=job_id, action=JobControlAction.STOP)

async def request_jobs_list(self, jobs_list_active_only: bool, *args, **kwargs) -> BasicResultIndicator:
async def request_jobs_list(self, jobs_list_active_only: bool, *args, **kwargs) -> JobListResponse:
"""
Request a list of ids of existing jobs.

Expand All @@ -177,15 +256,26 @@ async def request_jobs_list(self, jobs_list_active_only: bool, *args, **kwargs)

Returns
-------
BasicResultIndicator
JobListResponse
An indicator that, when successful, includes as ``data`` the list of ids of existing jobs.

See Also
-------
get_jobs_list
"""
# TODO: implement
raise NotImplementedError('{} function "request_jobs_list" not implemented yet'.format(self.__class__.__name__))
raw_response = None
try:
raw_response = await self._submit_job_request(request=JobListRequest(only_active=jobs_list_active_only))
response = JobListResponse.factory_init_from_deserialized_json(json.loads(raw_response))
if response is not None:
return response
else:
return JobListResponse(only_active=jobs_list_active_only, success=False,
reason="Response Deserializing Failed", data={"raw_response": raw_response},
message=f"Unable to deserialize JSON to {JobInfoResponse.__class__.__name__}")
except Exception as e:
return JobListResponse(only_active=jobs_list_active_only, success=False, reason=e.__class__.__name__,
message=str(e), data={"raw_response": raw_response} if raw_response else None)

async def submit_ngen_request(self, **kwargs) -> NGENRequestResponse:
return NGENRequestResponse.factory_init_from_deserialized_json(
Expand Down
4 changes: 2 additions & 2 deletions python/lib/client/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
license='',
include_package_data=True,
#install_requires=['websockets', 'jsonschema'],vi
install_requires=['dmod-core>=0.11.0', 'websockets>=8.1', 'pydantic>=1.10.8,~=1.10', 'dmod-communication>=0.16.0',
'dmod-externalrequests>=0.3.0'],
install_requires=['dmod-core>=0.11.0', 'websockets>=8.1', 'pydantic>=1.10.8,~=1.10', 'dmod-communication>=0.17.0',
'dmod-externalrequests>=0.6.0'],
packages=find_namespace_packages(include=['dmod.*'], exclude=['dmod.test'])
)
2 changes: 1 addition & 1 deletion python/lib/communication/dmod/communication/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.16.2'
__version__ = '0.17.0'
Loading
Loading