Skip to content

Commit

Permalink
docs: enhance ray module code documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
Ygnas committed Nov 12, 2024
1 parent eb5ce8d commit bafab36
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 75 deletions.
145 changes: 115 additions & 30 deletions src/codeflare_sdk/ray/client/ray_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,28 @@

class RayJobClient:
"""
A class that functions as a wrapper for the Ray Job Submission Client.
parameters:
address -- Either (1) the address of the Ray cluster, or (2) the HTTP address of the dashboard server on the head node, e.g. “http://<head-node-ip>:8265”. In case (1) it must be specified as an address that can be passed to ray.init(),
e.g. a Ray Client address (ray://<head_node_host>:10001), or “auto”, or “localhost:<port>”. If unspecified, will try to connect to a running local Ray cluster. This argument is always overridden by the RAY_ADDRESS environment variable.
create_cluster_if_needed -- Indicates whether the cluster at the specified address needs to already be running. Ray doesn't start a cluster before interacting with jobs, but third-party job managers may do so.
cookies -- Cookies to use when sending requests to the HTTP job server.
metadata -- Arbitrary metadata to store along with all jobs. New metadata specified per job will be merged with the global metadata provided here via a simple dict update.
headers -- Headers to use when sending requests to the HTTP job server, used for cases like authentication to a remote cluster.
verify -- Boolean indication to verify the server's TLS certificate or a path to a file or directory of trusted certificates. Default: True.
A wrapper class for the Ray Job Submission Client, used for interacting with Ray clusters to manage job
submissions, deletions, and other job-related information.
Args:
address (Optional[str]):
The Ray cluster's address, which may be either the Ray Client address, HTTP address
of the dashboard server on the head node, or "auto" / "localhost:<port>" for a local cluster.
This is overridden by the RAY_ADDRESS environment variable if set.
create_cluster_if_needed (bool):
If True, a new cluster will be created if not already running at the
specified address. By default, Ray requires an existing cluster.
cookies (Optional[Dict[str, Any]]):
HTTP cookies to send with requests to the job server.
metadata (Optional[Dict[str, Any]]):
Global metadata to store with all jobs, merged with job-specific
metadata during job submission.
headers (Optional[Dict[str, Any]]):
HTTP headers to send with requests to the job server, can be used for
authentication.
verify (Optional[Union[str, bool]]):
If True, verifies the server's TLS certificate. Can also be a path
to trusted certificates. Default is True.
"""

def __init__(
Expand Down Expand Up @@ -67,18 +79,35 @@ def submit_job(
entrypoint_resources: Optional[Dict[str, float]] = None,
) -> str:
"""
Method for submitting jobs to a Ray Cluster and returning the job id with entrypoint being a mandatory field.
Parameters:
entrypoint -- The shell command to run for this job.
submission_id -- A unique ID for this job.
runtime_env -- The runtime environment to install and run this job in.
metadata -- Arbitrary data to store along with this job.
job_id -- DEPRECATED. This has been renamed to submission_id
entrypoint_num_cpus -- The quantity of CPU cores to reserve for the execution of the entrypoint command, separately from any tasks or actors launched by it. Defaults to 0.
entrypoint_num_gpus -- The quantity of GPUs to reserve for the execution of the entrypoint command, separately from any tasks or actors launched by it. Defaults to 0.
entrypoint_memory –- The quantity of memory to reserve for the execution of the entrypoint command, separately from any tasks or actors launched by it. Defaults to 0.
entrypoint_resources -- The quantity of custom resources to reserve for the execution of the entrypoint command, separately from any tasks or actors launched by it.
Submits a job to the Ray cluster with specified resources and returns the job ID.
Args:
entrypoint (str):
The command to execute for this job.
job_id (Optional[str]):
Deprecated, use `submission_id`. A unique job identifier.
runtime_env (Optional[Dict[str, Any]]):
The runtime environment for this job.
metadata (Optional[Dict[str, str]]):
Metadata associated with the job, merged with global metadata.
submission_id (Optional[str]):
Unique ID for the job submission.
entrypoint_num_cpus (Optional[Union[int, float]]):
The quantity of CPU cores to reserve for the execution of the entrypoint command,
separately from any tasks or actors launched by it. Defaults to 0.
entrypoint_num_gpus (Optional[Union[int, float]]):
The quantity of GPUs to reserve for the execution of the entrypoint command,
separately from any tasks or actors launched by it. Defaults to 0.
entrypoint_memory (Optional[int]):
The quantity of memory to reserve for the execution of the entrypoint command,
separately from any tasks or actors launched by it. Defaults to 0.
entrypoint_resources (Optional[Dict[str, float]]):
The quantity of custom resources to reserve for the execution of the entrypoint command,
separately from any tasks or actors launched by it.
Returns:
str:
The unique identifier for the submitted job.
"""
return self.rayJobClient.submit_job(
entrypoint=entrypoint,
Expand All @@ -94,7 +123,15 @@ def submit_job(

def delete_job(self, job_id: str) -> (bool, str):
"""
Method for deleting jobs with the job id being a mandatory field.
Deletes a job by job ID.
Args:
job_id (str):
The unique identifier of the job to delete.
Returns:
tuple(bool, str):
A tuple with deletion status and a message.
"""
deletion_status = self.rayJobClient.delete_job(job_id=job_id)

Expand All @@ -107,37 +144,77 @@ def delete_job(self, job_id: str) -> (bool, str):

def get_address(self) -> str:
"""
Method for getting the address from the RayJobClient
Retrieves the address of the connected Ray cluster.
Returns:
str:
The Ray cluster's address.
"""
return self.rayJobClient.get_address()

def get_job_info(self, job_id: str):
"""
Method for getting the job info with the job id being a mandatory field.
Fetches information about a job by job ID.
Args:
job_id (str):
The unique identifier of the job.
Returns:
JobInfo:
Information about the job's status, progress, and other details.
"""
return self.rayJobClient.get_job_info(job_id=job_id)

def get_job_logs(self, job_id: str) -> str:
"""
Method for getting the job logs with the job id being a mandatory field.
Retrieves the logs for a specific job by job ID.
Args:
job_id (str):
The unique identifier of the job.
Returns:
str:
Logs output from the job.
"""
return self.rayJobClient.get_job_logs(job_id=job_id)

def get_job_status(self, job_id: str) -> str:
"""
Method for getting the job's status with the job id being a mandatory field.
Fetches the current status of a job by job ID.
Args:
job_id (str):
The unique identifier of the job.
Returns:
str:
The job's status.
"""
return self.rayJobClient.get_job_status(job_id=job_id)

def list_jobs(self) -> List[JobDetails]:
"""
Method for getting a list of current jobs in the Ray Cluster.
Lists all current jobs in the Ray cluster.
Returns:
List[JobDetails]:
A list of job details for each current job in the cluster.
"""
return self.rayJobClient.list_jobs()

def stop_job(self, job_id: str) -> (bool, str):
"""
Method for stopping a job with the job id being a mandatory field.
Stops a running job by job ID.
Args:
job_id (str):
The unique identifier of the job to stop.
Returns:
tuple(bool, str):
A tuple with the stop status and a message.
"""
stop_job_status = self.rayJobClient.stop_job(job_id=job_id)
if stop_job_status:
Expand All @@ -148,6 +225,14 @@ def stop_job(self, job_id: str) -> (bool, str):

def tail_job_logs(self, job_id: str) -> Iterator[str]:
"""
Method for getting an iterator that follows the logs of a job with the job id being a mandatory field.
Continuously streams the logs of a job.
Args:
job_id (str):
The unique identifier of the job.
Returns:
Iterator[str]:
An iterator that yields log entries in real-time.
"""
return self.rayJobClient.tail_job_logs(job_id=job_id)
101 changes: 79 additions & 22 deletions src/codeflare_sdk/ray/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,17 @@ def status(
return status, ready

def is_dashboard_ready(self) -> bool:
"""
Checks if the cluster's dashboard is ready and accessible.
This method attempts to send a GET request to the cluster dashboard URI.
If the request is successful (HTTP status code 200), it returns True.
If an SSL error occurs, it returns False, indicating the dashboard is not ready.
Returns:
bool:
True if the dashboard is ready, False otherwise.
"""
try:
response = requests.get(
self.cluster_dashboard_uri(),
Expand All @@ -313,8 +324,22 @@ def is_dashboard_ready(self) -> bool:

def wait_ready(self, timeout: Optional[int] = None, dashboard_check: bool = True):
"""
Waits for requested cluster to be ready, up to an optional timeout (s).
Checks every five seconds.
Waits for the requested cluster to be ready, up to an optional timeout.
This method checks the status of the cluster every five seconds until it is
ready or the timeout is reached. If dashboard_check is enabled, it will also
check for the readiness of the dashboard.
Args:
timeout (Optional[int]):
The maximum time to wait for the cluster to be ready in seconds. If None, waits indefinitely.
dashboard_check (bool):
Flag to determine if the dashboard readiness should
be checked. Defaults to True.
Raises:
TimeoutError:
If the timeout is reached before the cluster or dashboard is ready.
"""
print("Waiting for requested resources to be set up...")
time = 0
Expand Down Expand Up @@ -346,6 +371,21 @@ def wait_ready(self, timeout: Optional[int] = None, dashboard_check: bool = True
time += 5

def details(self, print_to_console: bool = True) -> RayCluster:
"""
Retrieves details about the Ray Cluster.
This method returns a copy of the Ray Cluster information and optionally prints
the details to the console.
Args:
print_to_console (bool):
Flag to determine if the cluster details should be
printed to the console. Defaults to True.
Returns:
RayCluster:
A copy of the Ray Cluster details.
"""
cluster = _copy_to_ray(self)
if print_to_console:
pretty_print.print_clusters([cluster])
Expand Down Expand Up @@ -447,6 +487,13 @@ def _head_worker_extended_resources_from_rc_dict(rc: Dict) -> Tuple[dict, dict]:
return head_extended_resources, worker_extended_resources

def local_client_url(self):
"""
Constructs the URL for the local Ray client.
Returns:
str:
The Ray client URL based on the ingress domain.
"""
ingress_domain = _get_ingress_domain(self)
return f"ray://{ingress_domain}"

Expand Down Expand Up @@ -504,6 +551,13 @@ def list_all_queued(


def get_current_namespace(): # pragma: no cover
"""
Retrieves the current Kubernetes namespace.
Returns:
str:
The current namespace or None if not found.
"""
if os.path.isfile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"):
try:
file = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r")
Expand All @@ -528,26 +582,29 @@ def get_cluster(
verify_tls: bool = True,
write_to_file: bool = False,
):
"""Returns the given Ray Cluster/AppWrapper as a Cluster Object
The get_cluster() method is used for retrieving a Ray Cluster that already exists in your K8s Cluster.
Returned is a basic Cluster object which includes the exact yaml for your Ray Cluster under Cluster.resource_yaml.
Parameters
----------
cluster_name : str
The name of the Ray Cluster/AppWrapper
namespace : str
The namespace of the Ray Cluster/AppWrapper
verify_tls : bool
A boolean indicating whether to verify TLS when connecting to the cluster
write_to_file : bool
A boolean indicating whether or not to write the resource to a Yaml file
Raises
------
Exception
If the Ray Cluster/AppWrapper cannot be found/does not exist
"""
Retrieves an existing Ray Cluster or AppWrapper as a Cluster object.
This function fetches an existing Ray Cluster or AppWrapper from the Kubernetes cluster and returns
it as a `Cluster` object, including its YAML configuration under `Cluster.resource_yaml`.
Args:
cluster_name (str):
The name of the Ray Cluster or AppWrapper.
namespace (str, optional):
The Kubernetes namespace where the Ray Cluster or AppWrapper is located. Default is "default".
verify_tls (bool, optional):
Whether to verify TLS when connecting to the cluster. Default is True.
write_to_file (bool, optional):
If True, writes the resource configuration to a YAML file. Default is False.
Returns:
Cluster:
A Cluster object representing the retrieved Ray Cluster or AppWrapper.
Raises:
Exception:
If the Ray Cluster or AppWrapper cannot be found or does not exist.
"""
config_check()
api_instance = client.CustomObjectsApi(get_api_client())
Expand Down
Loading

0 comments on commit bafab36

Please sign in to comment.