diff --git a/docs/cluster-configuration.md b/docs/cluster-configuration.md index 8ca4a2c26..abe59eae4 100644 --- a/docs/cluster-configuration.md +++ b/docs/cluster-configuration.md @@ -18,8 +18,7 @@ cluster = Cluster(ClusterConfiguration( worker_cpu_limits=1, # Default 1 worker_memory_requests=2, # Default 2 worker_memory_limits=2, # Default 2 - # image="", # Optional Field - machine_types=["m5.xlarge", "g4dn.xlarge"], + # image="", # Default quay.io/rhoai/ray:2.23.0-py39-cu121 labels={"exampleLabel": "example", "secondLabel": "example"}, )) ``` @@ -27,4 +26,4 @@ Note: 'quay.io/rhoai/ray:2.23.0-py39-cu121' is the default community image used The `labels={"exampleLabel": "example"}` parameter can be used to apply additional labels to the RayCluster resource. -After creating their `cluster`, a user can call `cluster.up()` and `cluster.down()` to respectively create or remove the Ray Cluster. +For detailed instructions on the various methods that can be called for interacting with Ray Clusters see [Ray Cluster Interaction](./ray_cluster_interaction.md). diff --git a/docs/ray_cluster_interaction.md b/docs/ray_cluster_interaction.md new file mode 100644 index 000000000..ad5309c90 --- /dev/null +++ b/docs/ray_cluster_interaction.md @@ -0,0 +1,71 @@ +# Ray Cluster Interaction + +The CodeFlare SDK offers multiple ways to interact with Ray Clusters including the below methods. + +## get_cluster() +The `get_cluster()` function is used to initialise a `Cluster` object from a pre-existing Ray Cluster/AppWrapper.
+Below is an example of it's usage: +``` +from codeflare_sdk import get_cluster +cluster = get_cluster(cluster_name="raytest", namespace="example", is_appwrapper=False, write_to_file=False) +-> output: Yaml resources loaded for raytest + +cluster.status() +-> output: + ๐Ÿš€ CodeFlare Cluster Status ๐Ÿš€ + + โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ + โ”‚ Name โ”‚ + โ”‚ raytest Active โœ… โ”‚ + โ”‚ โ”‚ + โ”‚ URI: ray://raytest-head-svc.example.svc:10001 โ”‚ + โ”‚ โ”‚ + โ”‚ Dashboard๐Ÿ”— โ”‚ + โ”‚ โ”‚ + โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ +(, True) + +cluster.down() + +cluster.up() # This function will create an exact copy of the retrieved Ray Cluster only if the Ray Cluster has been previously deleted. +``` + +These are the parameters the `get_cluster()` function accepts: +* `cluster_name: str # Required` -> The name of the Ray Cluster. +* `namespace: str # Default: "default"` -> The namespace of the Ray Cluster. +* `is_appwrapper: bool # Default: False` -> When set to `True` the function will attempt to retrieve an AppWrapper instead of a Ray Cluster. +* `write_to_file: bool # Default: False` -> When set to `True` the Ray Cluster/AppWrapper will be written to a file similar to how it is done in `ClusterConfiguration`. + +## list_all_queued() +The `list_all_queued()` function returns (and prints by default) a list of all currently queued-up Ray Clusters in a given namespace. +It accepts the following parameters: +* `namespace: str # Required` -> The namespace you want to retrieve the list from. +* `print_to_console: bool # Default: True` -> Allows the user to print the list to their console. +* `appwrapper: bool # Default: False` -> When set to `True` allows the user to list queued AppWrappers. + + + +## list_all_clusters() +The `list_all_clusters()` function will return a list of detailed descriptions of Ray Clusters to the console by default. It accepts the following parameters: +* `namespace: str # Required` -> The namespace you want to retrieve the list from. +* `print_to_console: bool # Default: True` -> A boolean that allows the user to print the list to their console. + +
+NOTE: The following methods require a `Cluster` object to be initialised see [Cluster Configuration](./cluster-configuration.md) + +## cluster.up() +The `cluster.up()` function creates a Ray Cluster in the given namespace. + +## cluster.down() +The `cluster.down()` function deletes the Ray Cluster in the given namespace. + +## cluster.status() +The `cluster.status()` function prints out the status of the Ray Cluster's state with a link to the Ray Dashboard. + +## cluster.details() +The `cluster.details()` function prints out a detailed description of the Ray Cluster's status, worker resources and a link to the Ray Dashboard. + +## cluster.wait_ready() +The `cluster.wait_ready()` function waits for the requested cluster to be ready, up to an optional timeout and checks every 5 seconds. It accepts the following parameters: +* `timeout: Optional[int] # Default: None` -> Allows the user to define a timeout for the `wait_ready()` function. +* `dashboard_check: bool # Default: True` -> If enabled the `wait_ready()` function will wait until the Ray Dashboard is ready too. diff --git a/src/codeflare_sdk/cluster/cluster.py b/src/codeflare_sdk/cluster/cluster.py index f0f50eb36..651404731 100644 --- a/src/codeflare_sdk/cluster/cluster.py +++ b/src/codeflare_sdk/cluster/cluster.py @@ -20,19 +20,19 @@ import re from time import sleep -from typing import List, Optional, Tuple, Dict +from typing import List, Optional, Tuple, Dict, Union from kubernetes import config from ray.job_submission import JobSubmissionClient from .auth import config_check, api_config_handler from ..utils import pretty_print -from ..utils.generate_yaml import ( - generate_appwrapper, + +from ..utils.build_ray_cluster import ( head_worker_gpu_count_from_cluster, + build_ray_cluster, ) from ..utils.kube_api_helpers import _kube_api_error_handling -from ..utils.generate_yaml import is_openshift_cluster from .config import ClusterConfiguration from .model import ( @@ -43,13 +43,14 @@ RayClusterStatus, ) from kubernetes import client, config -from kubernetes.utils import parse_quantity import yaml import os import requests from kubernetes import config from kubernetes.client.rest import ApiException +from ..utils.build_ray_cluster import write_to_file as write_cluster_to_file +import warnings class Cluster: @@ -60,7 +61,11 @@ class Cluster: Note that currently, the underlying implementation is a Ray cluster. """ - def __init__(self, config: ClusterConfiguration): + def __init__( + self, + config: ClusterConfiguration, + resource_yaml: Optional[Union[dict, str]] = None, + ): """ Create the resource cluster object by passing in a ClusterConfiguration (defined in the config sub-module). An AppWrapper will then be generated @@ -68,9 +73,18 @@ def __init__(self, config: ClusterConfiguration): request. """ self.config = config - self.app_wrapper_yaml = self.create_app_wrapper() + + if resource_yaml != None: + # We have to ensure that any user provided resource_yaml is correctly processed. + if resource_yaml == str: + self.config.write_to_file = True # Ensure that if a path to a resource yaml file is provided the correct cluster creation method is used + + self.resource_yaml = resource_yaml + else: + # If no resource is provided create the resource_yaml based on ClusterConfiguration + self.resource_yaml = self.create_resource() + self._job_submission_client = None - self.app_wrapper_name = self.config.name @property def _client_headers(self): @@ -83,7 +97,7 @@ def _client_headers(self): @property def _client_verify_tls(self): - if not is_openshift_cluster or not self.config.verify_tls: + if not _is_openshift_cluster or not self.config.verify_tls: return False return True @@ -92,7 +106,7 @@ def job_client(self): k8client = api_config_handler() or client.ApiClient() if self._job_submission_client: return self._job_submission_client - if is_openshift_cluster(): + if _is_openshift_cluster(): self._job_submission_client = JobSubmissionClient( self.cluster_dashboard_uri(), headers=self._client_headers, @@ -104,7 +118,7 @@ def job_client(self): ) return self._job_submission_client - def create_app_wrapper(self): + def create_resource(self): """ Called upon cluster object creation, creates an AppWrapper yaml based on the specifications of the ClusterConfiguration. @@ -119,7 +133,7 @@ def create_app_wrapper(self): f"Namespace {self.config.namespace} is of type {type(self.config.namespace)}. Check your Kubernetes Authentication." ) - return generate_appwrapper(self) + return build_ray_cluster(self) # creates a new cluster with the provided or default spec def up(self): @@ -138,7 +152,7 @@ def up(self): api_instance = client.CustomObjectsApi(api_config_handler()) if self.config.appwrapper: if self.config.write_to_file: - with open(self.app_wrapper_yaml) as f: + with open(self.resource_yaml) as f: aw = yaml.load(f, Loader=yaml.FullLoader) api_instance.create_namespaced_custom_object( group="workload.codeflare.dev", @@ -148,13 +162,12 @@ def up(self): body=aw, ) else: - aw = yaml.safe_load(self.app_wrapper_yaml) api_instance.create_namespaced_custom_object( group="workload.codeflare.dev", version="v1beta2", namespace=namespace, plural="appwrappers", - body=aw, + body=self.resource_yaml, ) else: self._component_resources_up(namespace, api_instance) @@ -196,10 +209,10 @@ def down(self): version="v1beta2", namespace=namespace, plural="appwrappers", - name=self.app_wrapper_name, + name=self.config.name, ) else: - self._component_resources_down(namespace, api_instance) + _delete_resources(self.config.name, namespace, api_instance) except Exception as e: # pragma: no cover return _kube_api_error_handling(e) @@ -342,7 +355,7 @@ def cluster_dashboard_uri(self) -> str: Returns a string containing the cluster's dashboard URI. """ config_check() - if is_openshift_cluster(): + if _is_openshift_cluster(): try: api_instance = client.CustomObjectsApi(api_config_handler()) routes = api_instance.list_namespaced_custom_object( @@ -426,55 +439,6 @@ def _head_worker_extended_resources_from_rc_dict(rc: Dict) -> Tuple[dict, dict]: return head_extended_resources, worker_extended_resources - def from_k8_cluster_object( - rc, - appwrapper=True, - write_to_file=False, - verify_tls=True, - ): - config_check() - machine_types = ( - rc["metadata"]["labels"]["orderedinstance"].split("_") - if "orderedinstance" in rc["metadata"]["labels"] - else [] - ) - - ( - head_extended_resources, - worker_extended_resources, - ) = Cluster._head_worker_extended_resources_from_rc_dict(rc) - - cluster_config = ClusterConfiguration( - name=rc["metadata"]["name"], - namespace=rc["metadata"]["namespace"], - machine_types=machine_types, - num_workers=rc["spec"]["workerGroupSpecs"][0]["minReplicas"], - worker_cpu_requests=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][ - "containers" - ][0]["resources"]["requests"]["cpu"], - worker_cpu_limits=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][ - "containers" - ][0]["resources"]["limits"]["cpu"], - worker_memory_requests=rc["spec"]["workerGroupSpecs"][0]["template"][ - "spec" - ]["containers"][0]["resources"]["requests"]["memory"], - worker_memory_limits=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][ - "containers" - ][0]["resources"]["limits"]["memory"], - worker_extended_resource_requests=worker_extended_resources, - head_extended_resource_requests=head_extended_resources, - image=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][ - 0 - ]["image"], - appwrapper=appwrapper, - write_to_file=write_to_file, - verify_tls=verify_tls, - local_queue=rc["metadata"] - .get("labels", dict()) - .get("kueue.x-k8s.io/queue-name", None), - ) - return Cluster(cluster_config) - def local_client_url(self): ingress_domain = _get_ingress_domain(self) return f"ray://{ingress_domain}" @@ -483,36 +447,11 @@ def _component_resources_up( self, namespace: str, api_instance: client.CustomObjectsApi ): if self.config.write_to_file: - with open(self.app_wrapper_yaml) as f: - yamls = list(yaml.load_all(f, Loader=yaml.FullLoader)) - for resource in yamls: - enable_ingress = ( - resource.get("spec", {}) - .get("headGroupSpec", {}) - .get("enableIngress") - ) - if resource["kind"] == "RayCluster" and enable_ingress is True: - name = resource["metadata"]["name"] - print( - f"Forbidden: RayCluster '{name}' has 'enableIngress' set to 'True'." - ) - return - _create_resources(yamls, namespace, api_instance) - else: - yamls = yaml.load_all(self.app_wrapper_yaml, Loader=yaml.FullLoader) - _create_resources(yamls, namespace, api_instance) - - def _component_resources_down( - self, namespace: str, api_instance: client.CustomObjectsApi - ): - cluster_name = self.config.name - if self.config.write_to_file: - with open(self.app_wrapper_yaml) as f: - yamls = yaml.load_all(f, Loader=yaml.FullLoader) - _delete_resources(yamls, namespace, api_instance, cluster_name) + with open(self.resource_yaml) as f: + ray_cluster = yaml.safe_load(f) + _create_resources(ray_cluster, namespace, api_instance) else: - yamls = yaml.safe_load_all(self.app_wrapper_yaml) - _delete_resources(yamls, namespace, api_instance, cluster_name) + _create_resources(self.resource_yaml, namespace, api_instance) def list_all_clusters(namespace: str, print_to_console: bool = True): @@ -583,82 +522,108 @@ def get_current_namespace(): # pragma: no cover return None -def get_cluster( - cluster_name: str, - namespace: str = "default", - write_to_file: bool = False, - verify_tls: bool = True, -): - try: - config_check() - api_instance = client.CustomObjectsApi(api_config_handler()) - rcs = api_instance.list_namespaced_custom_object( - group="ray.io", - version="v1", - namespace=namespace, - plural="rayclusters", - ) - except Exception as e: - return _kube_api_error_handling(e) +def _delete_resources(name: str, namespace: str, api_instance: client.CustomObjectsApi): + api_instance.delete_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=namespace, + plural="rayclusters", + name=name, + ) - for rc in rcs["items"]: - if rc["metadata"]["name"] == cluster_name: - appwrapper = _check_aw_exists(cluster_name, namespace) - return Cluster.from_k8_cluster_object( - rc, - appwrapper=appwrapper, - write_to_file=write_to_file, - verify_tls=verify_tls, - ) - raise FileNotFoundError( - f"Cluster {cluster_name} is not found in {namespace} namespace" + +def _create_resources(yamls, namespace: str, api_instance: client.CustomObjectsApi): + api_instance.create_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=namespace, + plural="rayclusters", + body=yamls, ) -# private methods -def _delete_resources( - yamls, namespace: str, api_instance: client.CustomObjectsApi, cluster_name: str +def get_cluster( + cluster_name: str, + namespace: str = "default", + verify_tls: bool = True, + is_appwrapper: bool = False, # As get_cluster no longer searches a list of potential candidates it is necessary to specify if the resource is an appwrapper + write_to_file: bool = False, ): - for resource in yamls: - if resource["kind"] == "RayCluster": - name = resource["metadata"]["name"] - api_instance.delete_namespaced_custom_object( - group="ray.io", - version="v1", + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + # Check/Get the AppWrapper if it exists + if is_appwrapper: + try: + resource = api_instance.get_namespaced_custom_object( + group="workload.codeflare.dev", + version="v1beta2", namespace=namespace, - plural="rayclusters", - name=name, + plural="appwrappers", + name=cluster_name, ) - - -def _create_resources(yamls, namespace: str, api_instance: client.CustomObjectsApi): - for resource in yamls: - if resource["kind"] == "RayCluster": - api_instance.create_namespaced_custom_object( + except Exception as e: + return _kube_api_error_handling(e) + else: + # Get the Ray Cluster + try: + resource = api_instance.get_namespaced_custom_object( group="ray.io", version="v1", namespace=namespace, plural="rayclusters", - body=resource, + name=cluster_name, ) + except Exception as e: + return _kube_api_error_handling(e) + # Create a Cluster Configuration with just the necessary provided parameters + cluster_config = ClusterConfiguration( + name=cluster_name, + namespace=namespace, + verify_tls=verify_tls, + write_to_file=write_to_file, + appwrapper=is_appwrapper, + ) -def _check_aw_exists(name: str, namespace: str) -> bool: - try: - config_check() - api_instance = client.CustomObjectsApi(api_config_handler()) - aws = api_instance.list_namespaced_custom_object( - group="workload.codeflare.dev", - version="v1beta2", - namespace=namespace, - plural="appwrappers", - ) - except Exception as e: # pragma: no cover - return _kube_api_error_handling(e, print_error=False) - for aw in aws["items"]: - if aw["metadata"]["name"] == name: - return True - return False + # Remove auto-generated fields like creationTimestamp, uid and etc. + remove_autogenerated_fields(resource) + + if write_to_file: + resource_yaml = write_cluster_to_file(cluster_name, resource) + else: + resource_yaml = resource + print(f"Yaml resources loaded for {cluster_name}") + + # Update the Cluster's resource_yaml to reflect the retrieved Ray Cluster/AppWrapper wether written to a file or not + cluster = Cluster(cluster_config, resource_yaml) + + return cluster + + +def remove_autogenerated_fields(resource): + """Recursively remove autogenerated fields from a dictionary.""" + if isinstance(resource, dict): + for key in list(resource.keys()): + if key in [ + "creationTimestamp", + "resourceVersion", + "uid", + "selfLink", + "managedFields", + "finalizers", + "generation", + "status", + "suspend", + "workload.codeflare.dev/user", # AppWrapper field + "workload.codeflare.dev/userid", # AppWrapper field + "podSetInfos", # AppWrapper field + ]: + del resource[key] + else: + remove_autogenerated_fields(resource[key]) + elif isinstance(resource, list): + for item in resource: + remove_autogenerated_fields(item) # Cant test this until get_current_namespace is fixed and placed in this function over using `self` @@ -671,7 +636,7 @@ def _get_ingress_domain(self): # pragma: no cover namespace = get_current_namespace() domain = None - if is_openshift_cluster(): + if _is_openshift_cluster(): try: api_instance = client.CustomObjectsApi(api_config_handler()) @@ -803,7 +768,7 @@ def _map_to_ray_cluster(rc) -> Optional[RayCluster]: status = RayClusterStatus.UNKNOWN config_check() dashboard_url = None - if is_openshift_cluster(): + if _is_openshift_cluster(): try: api_instance = client.CustomObjectsApi(api_config_handler()) routes = api_instance.list_namespaced_custom_object( @@ -905,3 +870,17 @@ def _copy_to_ray(cluster: Cluster) -> RayCluster: if ray.status == CodeFlareClusterStatus.READY: ray.status = RayClusterStatus.READY return ray + + +# Check if the routes api exists +def _is_openshift_cluster(): + try: + config_check() + for api in client.ApisApi(api_config_handler()).get_api_versions().groups: + for v in api.versions: + if "route.openshift.io/v1" in v.group_version: + return True + else: + return False + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) diff --git a/src/codeflare_sdk/cluster/config.py b/src/codeflare_sdk/cluster/config.py index 610d53c44..9061e2a37 100644 --- a/src/codeflare_sdk/cluster/config.py +++ b/src/codeflare_sdk/cluster/config.py @@ -47,19 +47,16 @@ class ClusterConfiguration: Attributes: - name: The name of the cluster. - namespace: The namespace in which the cluster should be created. - - head_info: A list of strings containing information about the head node. - head_cpus: The number of CPUs to allocate to the head node. - head_memory: The amount of memory to allocate to the head node. - head_gpus: The number of GPUs to allocate to the head node. (Deprecated, use head_extended_resource_requests) - head_extended_resource_requests: A dictionary of extended resource requests for the head node. ex: {"nvidia.com/gpu": 1} - - machine_types: A list of machine types to use for the cluster. - min_cpus: The minimum number of CPUs to allocate to each worker. - max_cpus: The maximum number of CPUs to allocate to each worker. - num_workers: The number of workers to create. - min_memory: The minimum amount of memory to allocate to each worker. - max_memory: The maximum amount of memory to allocate to each worker. - num_gpus: The number of GPUs to allocate to each worker. (Deprecated, use worker_extended_resource_requests) - - template: The path to the template file to use for the cluster. - appwrapper: A boolean indicating whether to use an AppWrapper. - envs: A dictionary of environment variables to set for the cluster. - image: The image to use for the cluster. @@ -74,14 +71,10 @@ class ClusterConfiguration: name: str namespace: Optional[str] = None - head_info: List[str] = field(default_factory=list) head_cpus: Union[int, str] = 2 head_memory: Union[int, str] = 8 head_gpus: Optional[int] = None # Deprecating head_extended_resource_requests: Dict[str, int] = field(default_factory=dict) - machine_types: List[str] = field( - default_factory=list - ) # ["m4.xlarge", "g4dn.xlarge"] worker_cpu_requests: Union[int, str] = 1 worker_cpu_limits: Union[int, str] = 1 min_cpus: Optional[Union[int, str]] = None # Deprecating @@ -92,10 +85,9 @@ class ClusterConfiguration: min_memory: Optional[Union[int, str]] = None # Deprecating max_memory: Optional[Union[int, str]] = None # Deprecating num_gpus: Optional[int] = None # Deprecating - template: str = f"{dir}/templates/base-template.yaml" appwrapper: bool = False envs: Dict[str, str] = field(default_factory=dict) - image: str = "" + image: str = "quay.io/rhoai/ray:2.23.0-py39-cu121" image_pull_secrets: List[str] = field(default_factory=list) write_to_file: bool = False verify_tls: bool = True diff --git a/src/codeflare_sdk/models/__init__.py b/src/codeflare_sdk/models/__init__.py new file mode 100644 index 000000000..fd333a2f9 --- /dev/null +++ b/src/codeflare_sdk/models/__init__.py @@ -0,0 +1,14 @@ +from .v1_head_group_spec import V1HeadGroupSpec +from .v1_worker_group_spec import V1WorkerGroupSpec +from .v1_raycluster import V1RayCluster +from .v1_raycluster_spec import V1RayClusterSpec +from .v1_scale_strategy import V1ScaleStrategy +from .v1_autoscaler_options import V1AutoScalerOptions + +from importlib.metadata import version, PackageNotFoundError + +try: + __version__ = version("codeflare-sdk") # use metadata associated with built package + +except PackageNotFoundError: + __version__ = "v0.0.0" diff --git a/src/codeflare_sdk/models/v1_autoscaler_options.py b/src/codeflare_sdk/models/v1_autoscaler_options.py new file mode 100644 index 000000000..8e40450db --- /dev/null +++ b/src/codeflare_sdk/models/v1_autoscaler_options.py @@ -0,0 +1,190 @@ +import pprint + +import six + + +class V1AutoScalerOptions(object): # pragma: no cover + openapi_types = { + "resources": "V1ResourceRequirements", + "image": "str", + "imagePullPolicy": "str", + "securityContext": "V1SecurityContext", + "idleTimeoutSeconds": "int32", + "upscalingMode": "str", + "env": "List[V1EnvVar]", + "envFrom": "List[V1EnvFromSource]", + "volumeMounts": "List[V1VolumeMount]", + } + + attribute_map = { + "resources": "resources", + "image": "image", + "imagePullPolicy": "imagePullPolicy", + "securityContext": "securityContext", + "idleTimeoutSeconds": "idleTimeoutSeconds", + "upscalingMode": "upscalingMode", + "env": "env", + "envFrom": "envFrom", + "volumeMounts": "volumeMounts", + } + + def __init__( + self, + resources=None, + image=None, + imagePullPolicy=None, + securityContext=None, + idleTimeoutSeconds=None, + upscalingMode=None, + env=None, + envFrom=None, + volumeMounts=None, + ): + self._resources = None + self._image = None + self._imagePullPolicy = None + self._securityContext = None + self._idleTimeoutSeconds = None + self._upscalingMode = None + self._env = None + self._envFrom = None + self._volumeMounts = None + + if resources is not None: + self._resources = resources + if image is not None: + self._image = image + if imagePullPolicy is not None: + self._imagePullPolicy = imagePullPolicy + if securityContext is not None: + self._securityContext = securityContext + if idleTimeoutSeconds is not None: + self._idleTimeoutSeconds = idleTimeoutSeconds + if upscalingMode is not None: + self._upscalingMode = upscalingMode + if env is not None: + self._env = env + if envFrom is not None: + self._envFrom = envFrom + if volumeMounts is not None: + self._volumeMounts = volumeMounts + + @property + def resources(self): + return self._resources + + @resources.setter + def resources(self, resources): + self._resources = resources + + @property + def image(self): + return self._image + + @image.setter + def image(self, image): + self._image = image + + @property + def imagePullPolicy(self): + return self._imagePullPolicy + + @imagePullPolicy.setter + def imagePullPolicy(self, imagePullPolicy): + self._imagePullPolicy = imagePullPolicy + + @property + def securityContext(self): + return self._securityContext + + @securityContext.setter + def securityContext(self, securityContext): + self._securityContext = securityContext + + @property + def idleTimeoutSeconds(self): + return self._idleTimeoutSeconds + + @idleTimeoutSeconds.setter + def idleTimeoutSeconds(self, idleTimeoutSeconds): + self._idleTimeoutSeconds = idleTimeoutSeconds + + @property + def upscalingMode(self): + return self._upscalingMode + + @upscalingMode.setter + def upscalingMode(self, upscalingMode): + self._upscalingMode = upscalingMode + + @property + def env(self): + return self._env + + @env.setter + def env(self, env): + self._env = env + + @property + def envFrom(self): + return self._envFrom + + @envFrom.setter + def envFrom(self, envFrom): + self._envFrom = envFrom + + @property + def volumeMounts(self): + return self._volumeMounts + + @volumeMounts.setter + def volumeMounts(self, volumeMounts): + self._volumeMounts = volumeMounts + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.openapi_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list( + map(lambda x: x.to_dict() if hasattr(x, "to_dict") else x, value) + ) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict( + map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") + else item, + value.items(), + ) + ) + else: + result[attr] = value + + return result + + def to_str(self): + """Returns the string representation of the model""" + return pprint.pformat(self.to_dict()) + + def __repr__(self): + """For `print` and `pprint`""" + return self.to_str() + + def __eq__(self, other): + """Returns true if both objects are equal""" + if not isinstance(other, V1AutoScalerOptions): + return False + + return self.to_dict() == other.to_dict() + + def __ne__(self, other): + """Returns true if both objects are not equal""" + if not isinstance(other, V1AutoScalerOptions): + return True + + return self.to_dict() != other.to_dict() diff --git a/src/codeflare_sdk/models/v1_head_group_spec.py b/src/codeflare_sdk/models/v1_head_group_spec.py new file mode 100644 index 000000000..23ce6ef62 --- /dev/null +++ b/src/codeflare_sdk/models/v1_head_group_spec.py @@ -0,0 +1,134 @@ +import pprint + +import six + + +class V1HeadGroupSpec(object): # pragma: no cover + openapi_types = { + "serviceType": "str", + "headService": "str", + "enableIngress": "bool", + "rayStartParams": "Dict[str, str]", + "template": "V1PodTemplateSpec", + } + + attribute_map = { + "serviceType": "serviceType", + "headService": "headService", + "enableIngress": "enableIngress", + "rayStartParams": "rayStartParams", + "template": "template", + } + + def __init__( + self, + serviceType=None, + headService=None, + enableIngress=None, + rayStartParams=None, + template=None, + ): + self._serviceType = None + self._headService = None + self._enableIngress = None + self._rayStartParams = None + self._template = None + + if serviceType is not None: + self._serviceType = serviceType + if headService is not None: + self._headService = headService + if enableIngress is not None: + self._enableIngress = enableIngress + if rayStartParams is not None: + self._rayStartParams = rayStartParams + if template is not None: + self._template = template + + @property + def serviceType(self): + return self._serviceType + + @serviceType.setter + def serviceType(self, serviceType): + self._serviceType = serviceType + + @property + def headService(self): + return self._headService + + @headService.setter + def headService(self, headService): + self._headService = headService + + @property + def enableIngress(self): + return self._enableIngress + + @enableIngress.setter + def enableIngress(self, enableIngress): + self._enableIngress = enableIngress + + @property + def rayStartParams(self): + return self._rayStartParams + + @rayStartParams.setter + def rayStartParams(self, rayStartParams): + self._rayStartParams = rayStartParams + + @property + def template(self): + return self._template + + @template.setter + def template(self, template): + self._template = template + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.openapi_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list( + map(lambda x: x.to_dict() if hasattr(x, "to_dict") else x, value) + ) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict( + map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") + else item, + value.items(), + ) + ) + else: + result[attr] = value + + return result + + def to_str(self): + """Returns the string representation of the model""" + return pprint.pformat(self.to_dict()) + + def __repr__(self): + """For `print` and `pprint`""" + return self.to_str() + + def __eq__(self, other): + """Returns true if both objects are equal""" + if not isinstance(other, V1HeadGroupSpec): + return False + + return self.to_dict() == other.to_dict() + + def __ne__(self, other): + """Returns true if both objects are not equal""" + if not isinstance(other, V1HeadGroupSpec): + return True + + return self.to_dict() != other.to_dict() diff --git a/src/codeflare_sdk/models/v1_raycluster.py b/src/codeflare_sdk/models/v1_raycluster.py new file mode 100644 index 000000000..ffbc9683c --- /dev/null +++ b/src/codeflare_sdk/models/v1_raycluster.py @@ -0,0 +1,114 @@ +import pprint + +import six + + +class V1RayCluster(object): # pragma: no cover + openapi_types = { + "apiVersion": "str", + "kind": "str", + "metadata": "V1ObjectMeta", + "spec": "V1RayClusterSpec", + } + + attribute_map = { + "apiVersion": "apiVersion", + "kind": "kind", + "metadata": "metadata", + "spec": "spec", + } + + def __init__(self, apiVersion=None, kind=None, metadata=None, spec=None): + self._apiVersion = None + self._kind = None + self._metadata = None + self._spec = None + + if apiVersion is not None: + self._apiVersion = apiVersion + if kind is not None: + self._kind = kind + if metadata is not None: + self._metadata = metadata + if spec is not None: + self._spec = spec + + @property + def apiVersion(self): + return self._apiVersion + + @apiVersion.setter + def apiVersion(self, apiVersion): + self._apiVersion = apiVersion + + @property + def kind(self): + return self._kind + + @kind.setter + def kind(self, kind): + self._kind = kind + + @property + def metadata(self): + return self._metadata + + @metadata.setter + def metadata(self, metadata): + self._metadata = metadata + + @property + def spec(self): + return self._spec + + @spec.setter + def spec(self, spec): + self._spec = spec + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.openapi_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list( + map(lambda x: x.to_dict() if hasattr(x, "to_dict") else x, value) + ) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict( + map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") + else item, + value.items(), + ) + ) + else: + result[attr] = value + + return result + + def to_str(self): + """Returns the string representation of the model""" + return pprint.pformat(self.to_dict()) + + def __repr__(self): + """For `print` and `pprint`""" + return self.to_str() + + def __eq__(self, other): + """Returns true if both objects are equal""" + if not isinstance(other, V1RayCluster): + return False + + return self.to_dict() == other.to_dict() + + def __ne__(self, other): + """Returns true if both objects are not equal""" + if not isinstance(other, V1RayCluster): + return True + + return self.to_dict() != other.to_dict() diff --git a/src/codeflare_sdk/models/v1_raycluster_spec.py b/src/codeflare_sdk/models/v1_raycluster_spec.py new file mode 100644 index 000000000..65d5d8da8 --- /dev/null +++ b/src/codeflare_sdk/models/v1_raycluster_spec.py @@ -0,0 +1,162 @@ +import pprint + +import six + + +class V1RayClusterSpec(object): # pragma: no cover + openapi_types = { + "suspend": "bool", + "autoscalerOptions": "V1AutoScalerOptions", + "headServiceAnnotations": "Dict[str:str]", + "enableInTreeAutoscaling": "bool", + "headGroupSpec": "V1HeadGroupSpec", + "rayVersion": "str", + "workerGroupSpecs": "List[V1WorkerGroupSpec]", + } + + attribute_map = { + "suspend": "suspend", + "autoscalerOptions": "autoscalerOptions", + "headServiceAnnotations": "headServiceAnnotations", + "enableInTreeAutoscaling": "enableInTreeAutoscaling", + "headGroupSpec": "headGroupSpec", + "rayVersion": "rayVersion", + "workerGroupSpecs": "workerGroupSpecs", + } + + def __init__( + self, + suspend=None, + autoscalerOptions=None, + headServiceAnnotations=None, + enableInTreeAutoscaling=None, + headGroupSpec=None, + rayVersion=None, + workerGroupSpecs=None, + ): + self._suspend = None + self._autoscalerOptions = None + self._headServiceAnnotations = None + self._enableInTreeAutoscaling = None + self._headGroupSpec = None + self._rayVersion = None + self._workerGroupSpecs = None + + if suspend is not None: + self._suspend = suspend + if autoscalerOptions is not None: + self._autoscalerOptions = autoscalerOptions + if headServiceAnnotations is not None: + self._headServiceAnnotations = headServiceAnnotations + if enableInTreeAutoscaling is not None: + self._enableInTreeAutoscaling = enableInTreeAutoscaling + if headGroupSpec is not None: + self._headGroupSpec = headGroupSpec + if rayVersion is not None: + self._rayVersion = rayVersion + if workerGroupSpecs is not None: + self._workerGroupSpecs = workerGroupSpecs + + @property + def suspend(self): + return self._suspend + + @suspend.setter + def suspend(self, suspend): + self._suspend = suspend + + @property + def autoscalerOptions(self): + return self._autoscalerOptions + + @autoscalerOptions.setter + def autoscalerOptions(self, autoscalerOptions): + self._autoscalerOptions = autoscalerOptions + + @property + def headServiceAnnotations(self): + return self._headServiceAnnotations + + @headServiceAnnotations.setter + def headServiceAnnotations(self, headServiceAnnotations): + self._headServiceAnnotations = headServiceAnnotations + + @property + def enableInTreeAutoscaling(self): + return self._enableInTreeAutoscaling + + @enableInTreeAutoscaling.setter + def enableInTreeAutoscaling(self, enableInTreeAutoscaling): + self._enableInTreeAutoscaling = enableInTreeAutoscaling + + @property + def headGroupSpec(self): + return self._headGroupSpec + + @headGroupSpec.setter + def headGroupSpec(self, headGroupSpec): + self._headGroupSpec = headGroupSpec + + @property + def rayVersion(self): + return self._rayVersion + + @rayVersion.setter + def rayVersion(self, rayVersion): + self._rayVersion = rayVersion + + @property + def workerGroupSpecs(self): + return self._workerGroupSpecs + + @workerGroupSpecs.setter + def workerGroupSpecs(self, workerGroupSpecs): + self._workerGroupSpecs = workerGroupSpecs + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.openapi_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list( + map(lambda x: x.to_dict() if hasattr(x, "to_dict") else x, value) + ) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict( + map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") + else item, + value.items(), + ) + ) + else: + result[attr] = value + + return result + + def to_str(self): + """Returns the string representation of the model""" + return pprint.pformat(self.to_dict()) + + def __repr__(self): + """For `print` and `pprint`""" + return self.to_str() + + def __eq__(self, other): + """Returns true if both objects are equal""" + if not isinstance(other, V1RayClusterSpec): + return False + + return self.to_dict() == other.to_dict() + + def __ne__(self, other): + """Returns true if both objects are not equal""" + if not isinstance(other, V1RayClusterSpec): + return True + + return self.to_dict() != other.to_dict() diff --git a/src/codeflare_sdk/models/v1_scale_strategy.py b/src/codeflare_sdk/models/v1_scale_strategy.py new file mode 100644 index 000000000..a7cccdf18 --- /dev/null +++ b/src/codeflare_sdk/models/v1_scale_strategy.py @@ -0,0 +1,73 @@ +import pprint + +import six + + +class V1ScaleStrategy(object): # pragma: no cover + openapi_types = {"workersToDelete": "List[str]"} + + attribute_map = { + "workersToDelete": "workersToDelete", + } + + def __init__(self, workersToDelete=None): + self._workersToDelete = None + + if workersToDelete is not None: + self._workersToDelete = workersToDelete + + @property + def workersToDelete(self): + return self._workersToDelete + + @workersToDelete.setter + def workersToDelete(self, workersToDelete): + self._workersToDelete = workersToDelete + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.openapi_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list( + map(lambda x: x.to_dict() if hasattr(x, "to_dict") else x, value) + ) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict( + map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") + else item, + value.items(), + ) + ) + else: + result[attr] = value + + return result + + def to_str(self): + """Returns the string representation of the model""" + return pprint.pformat(self.to_dict()) + + def __repr__(self): + """For `print` and `pprint`""" + return self.to_str() + + def __eq__(self, other): + """Returns true if both objects are equal""" + if not isinstance(other, V1ScaleStrategy): + return False + + return self.to_dict() == other.to_dict() + + def __ne__(self, other): + """Returns true if both objects are not equal""" + if not isinstance(other, V1ScaleStrategy): + return True + + return self.to_dict() != other.to_dict() diff --git a/src/codeflare_sdk/models/v1_worker_group_spec.py b/src/codeflare_sdk/models/v1_worker_group_spec.py new file mode 100644 index 000000000..3c585747e --- /dev/null +++ b/src/codeflare_sdk/models/v1_worker_group_spec.py @@ -0,0 +1,176 @@ +import pprint + +import six + + +class V1WorkerGroupSpec(object): # pragma: no cover + openapi_types = { + "groupName": "str", + "replicas": "int32", + "minReplicas": "int32", + "maxReplicas": "int32", + "rayStartParams": "V1RayStartParams", + "template": "V1PodTemplateSpec", + "scaleStrategy": "V1ScaleStrategy", + "numOfHosts": "int32", + } + + attribute_map = { + "groupName": "groupName", + "replicas": "replicas", + "minReplicas": "minReplicas", + "maxReplicas": "maxReplicas", + "rayStartParams": "rayStartParams", + "template": "template", + "scaleStrategy": "scaleStrategy", + "numOfHosts": "numOfHosts", + } + + def __init__( + self, + groupName=None, + replicas=None, + minReplicas=None, + maxReplicas=None, + rayStartParams=None, + template=None, + scaleStrategy=None, + numOfHosts=None, + ): + self._groupName = None + self._replicas = None + self._minReplicas = None + self._maxReplicas = None + self._rayStartParams = None + self._template = None + self._scaleStrategy = None + self._numOfHosts = None + + if groupName is not None: + self._groupName = groupName + if replicas is not None: + self._replicas = replicas + if minReplicas is not None: + self._minReplicas = minReplicas + if maxReplicas is not None: + self._maxReplicas = maxReplicas + if rayStartParams is not None: + self._rayStartParams = rayStartParams + if template is not None: + self._template = template + if scaleStrategy is not None: + self._scaleStrategy = scaleStrategy + if numOfHosts is not None: + self._numOfHosts = numOfHosts + + @property + def groupName(self): + return self._groupName + + @groupName.setter + def groupName(self, groupName): + self._groupName = groupName + + @property + def replicas(self): + return self._replicas + + @replicas.setter + def replicas(self, replicas): + self._replicas = replicas + + @property + def minReplicas(self): + return self._minReplicas + + @minReplicas.setter + def minReplicas(self, minReplicas): + self._minReplicas = minReplicas + + @property + def maxReplicas(self): + return self._maxReplicas + + @maxReplicas.setter + def maxReplicas(self, maxReplicas): + self._maxReplicas = maxReplicas + + @property + def rayStartParams(self): + return self._rayStartParams + + @rayStartParams.setter + def rayStartParams(self, rayStartParams): + self._rayStartParams = rayStartParams + + @property + def template(self): + return self._template + + @template.setter + def template(self, template): + self._template = template + + @property + def scaleStrategy(self): + return self._scaleStrategy + + @scaleStrategy.setter + def scaleStrategy(self, scaleStrategy): + self._scaleStrategy = scaleStrategy + + @property + def numOfHosts(self): + return self._numOfHosts + + @numOfHosts.setter + def numOfHosts(self, numOfHosts): + self._numOfHosts = numOfHosts + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.openapi_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list( + map(lambda x: x.to_dict() if hasattr(x, "to_dict") else x, value) + ) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict( + map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") + else item, + value.items(), + ) + ) + else: + result[attr] = value + + return result + + def to_str(self): + """Returns the string representation of the model""" + return pprint.pformat(self.to_dict()) + + def __repr__(self): + """For `print` and `pprint`""" + return self.to_str() + + def __eq__(self, other): + """Returns true if both objects are equal""" + if not isinstance(other, V1WorkerGroupSpec): + return False + + return self.to_dict() == other.to_dict() + + def __ne__(self, other): + """Returns true if both objects are not equal""" + if not isinstance(other, V1WorkerGroupSpec): + return True + + return self.to_dict() != other.to_dict() diff --git a/src/codeflare_sdk/templates/base-template.yaml b/src/codeflare_sdk/templates/base-template.yaml deleted file mode 100644 index 076bd2623..000000000 --- a/src/codeflare_sdk/templates/base-template.yaml +++ /dev/null @@ -1,194 +0,0 @@ -# This config demonstrates KubeRay's Ray autoscaler integration. -# The resource requests and limits in this config are too small for production! -# For an example with more realistic resource configuration, see -# ray-cluster.autoscaler.large.yaml. -apiVersion: ray.io/v1 -kind: RayCluster -metadata: - labels: - controller-tools.k8s.io: "1.0" - # A unique identifier for the head node and workers of this cluster. - name: kuberay-cluster - namespace: default -spec: - # The version of Ray you are using. Make sure all Ray containers are running this version of Ray. - rayVersion: '2.23.0' - # If enableInTreeAutoscaling is true, the autoscaler sidecar will be added to the Ray head pod. - # Ray autoscaler integration is supported only for Ray versions >= 1.11.0 - # Ray autoscaler integration is Beta with KubeRay >= 0.3.0 and Ray >= 2.0.0. - enableInTreeAutoscaling: false - # autoscalerOptions is an OPTIONAL field specifying configuration overrides for the Ray autoscaler. - # The example configuration shown below below represents the DEFAULT values. - # (You may delete autoscalerOptions if the defaults are suitable.) - autoscalerOptions: - # upscalingMode is "Default" or "Aggressive." - # Conservative: Upscaling is rate-limited; the number of pending worker pods is at most the size of the Ray cluster. - # Default: Upscaling is not rate-limited. - # Aggressive: An alias for Default; upscaling is not rate-limited. - upscalingMode: Default - # idleTimeoutSeconds is the number of seconds to wait before scaling down a worker pod which is not using Ray resources. - idleTimeoutSeconds: 60 - # image optionally overrides the autoscaler's container image. - # If instance.spec.rayVersion is at least "2.0.0", the autoscaler will default to the same image as - # the ray container. For older Ray versions, the autoscaler will default to using the Ray 2.0.0 image. - ## image: "my-repo/my-custom-autoscaler-image:tag" - # imagePullPolicy optionally overrides the autoscaler container's image pull policy. - imagePullPolicy: Always - # resources specifies optional resource request and limit overrides for the autoscaler container. - # For large Ray clusters, we recommend monitoring container resource usage to determine if overriding the defaults is required. - resources: - limits: - cpu: "500m" - memory: "512Mi" - requests: - cpu: "500m" - memory: "512Mi" - ######################headGroupSpec################################# - # head group template and specs, (perhaps 'group' is not needed in the name) - headGroupSpec: - # Kubernetes Service Type, valid values are 'ClusterIP', 'NodePort' and 'LoadBalancer' - serviceType: ClusterIP - enableIngress: false - # logical group name, for this called head-group, also can be functional - # pod type head or worker - # rayNodeType: head # Not needed since it is under the headgroup - # the following params are used to complete the ray start: ray start --head --block ... - rayStartParams: - # Flag "no-monitor" will be automatically set when autoscaling is enabled. - dashboard-host: '0.0.0.0' - block: 'true' - # num-cpus: '1' # can be auto-completed from the limits - # Use `resources` to optionally specify custom resource annotations for the Ray node. - # The value of `resources` is a string-integer mapping. - # Currently, `resources` must be provided in the specific format demonstrated below: - # resources: '"{\"Custom1\": 1, \"Custom2\": 5}"' - num-gpus: '0' - #pod template - template: - spec: - containers: - # The Ray head pod - - name: ray-head - image: quay.io/rhoai/ray:2.23.0-py39-cu121 - imagePullPolicy: Always - ports: - - containerPort: 6379 - name: gcs - - containerPort: 8265 - name: dashboard - - containerPort: 10001 - name: client - lifecycle: - preStop: - exec: - command: ["/bin/sh","-c","ray stop"] - resources: - limits: - cpu: 2 - memory: "8G" - requests: - cpu: 2 - memory: "8G" - volumeMounts: - - mountPath: /etc/pki/tls/certs/odh-trusted-ca-bundle.crt - name: odh-trusted-ca-cert - subPath: odh-trusted-ca-bundle.crt - - mountPath: /etc/ssl/certs/odh-trusted-ca-bundle.crt - name: odh-trusted-ca-cert - subPath: odh-trusted-ca-bundle.crt - - mountPath: /etc/pki/tls/certs/odh-ca-bundle.crt - name: odh-ca-cert - subPath: odh-ca-bundle.crt - - mountPath: /etc/ssl/certs/odh-ca-bundle.crt - name: odh-ca-cert - subPath: odh-ca-bundle.crt - volumes: - - name: odh-trusted-ca-cert - configMap: - name: odh-trusted-ca-bundle - items: - - key: ca-bundle.crt - path: odh-trusted-ca-bundle.crt - optional: true - - name: odh-ca-cert - configMap: - name: odh-trusted-ca-bundle - items: - - key: odh-ca-bundle.crt - path: odh-ca-bundle.crt - optional: true - workerGroupSpecs: - # the pod replicas in this group typed worker - - replicas: 3 - minReplicas: 3 - maxReplicas: 3 - # logical group name, for this called small-group, also can be functional - groupName: small-group - # if worker pods need to be added, we can simply increment the replicas - # if worker pods need to be removed, we decrement the replicas, and populate the podsToDelete list - # the operator will remove pods from the list until the number of replicas is satisfied - # when a pod is confirmed to be deleted, its name will be removed from the list below - #scaleStrategy: - # workersToDelete: - # - raycluster-complete-worker-small-group-bdtwh - # - raycluster-complete-worker-small-group-hv457 - # - raycluster-complete-worker-small-group-k8tj7 - # the following params are used to complete the ray start: ray start --block ... - rayStartParams: - block: 'true' - num-gpus: 1 - #pod template - template: - metadata: - labels: - key: value - # annotations for pod - annotations: - key: value - # finalizers: - # - kubernetes - spec: - containers: - - name: machine-learning # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name', or '123-abc' - image: quay.io/rhoai/ray:2.23.0-py39-cu121 - # environment variables to set in the container.Optional. - # Refer to https://kubernetes.io/docs/tasks/inject-data-application/define-environment-variable-container/ - lifecycle: - preStop: - exec: - command: ["/bin/sh","-c","ray stop"] - resources: - limits: - cpu: "2" - memory: "12G" - requests: - cpu: "2" - memory: "12G" - volumeMounts: - - mountPath: /etc/pki/tls/certs/odh-trusted-ca-bundle.crt - name: odh-trusted-ca-cert - subPath: odh-trusted-ca-bundle.crt - - mountPath: /etc/ssl/certs/odh-trusted-ca-bundle.crt - name: odh-trusted-ca-cert - subPath: odh-trusted-ca-bundle.crt - - mountPath: /etc/pki/tls/certs/odh-ca-bundle.crt - name: odh-ca-cert - subPath: odh-ca-bundle.crt - - mountPath: /etc/ssl/certs/odh-ca-bundle.crt - name: odh-ca-cert - subPath: odh-ca-bundle.crt - volumes: - - name: odh-trusted-ca-cert - configMap: - name: odh-trusted-ca-bundle - items: - - key: ca-bundle.crt - path: odh-trusted-ca-bundle.crt - optional: true - - name: odh-ca-cert - configMap: - name: odh-trusted-ca-bundle - items: - - key: odh-ca-bundle.crt - path: odh-ca-bundle.crt - optional: true diff --git a/src/codeflare_sdk/utils/build_ray_cluster.py b/src/codeflare_sdk/utils/build_ray_cluster.py new file mode 100644 index 000000000..4c057f0bd --- /dev/null +++ b/src/codeflare_sdk/utils/build_ray_cluster.py @@ -0,0 +1,509 @@ +# Copyright 2022 IBM, Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" + This sub-module exists primarily to be used internally by the Cluster object + (in the cluster sub-module) for RayCluster/AppWrapper generation. +""" +from typing import Union, Tuple, Dict +from .kube_api_helpers import _kube_api_error_handling +from ..cluster.auth import api_config_handler, config_check +from kubernetes.client.exceptions import ApiException +import codeflare_sdk +import json +import os + +from kubernetes import client +from kubernetes.client import ( + V1ObjectMeta, + V1KeyToPath, + V1ConfigMapVolumeSource, + V1Volume, + V1VolumeMount, + V1ResourceRequirements, + V1Container, + V1ContainerPort, + V1Lifecycle, + V1ExecAction, + V1LifecycleHandler, + V1EnvVar, + V1PodTemplateSpec, + V1PodSpec, + V1LocalObjectReference, +) + +import yaml +from ..models import ( + V1WorkerGroupSpec, + V1RayClusterSpec, + V1HeadGroupSpec, + V1RayCluster, + V1AutoScalerOptions, +) +import uuid + +FORBIDDEN_CUSTOM_RESOURCE_TYPES = ["GPU", "CPU", "memory"] +VOLUME_MOUNTS = [ + V1VolumeMount( + mount_path="/etc/pki/tls/certs/odh-trusted-ca-bundle.crt", + name="odh-trusted-ca-cert", + sub_path="odh-trusted-ca-bundle.crt", + ), + V1VolumeMount( + mount_path="/etc/ssl/certs/odh-trusted-ca-bundle.crt", + name="odh-trusted-ca-cert", + sub_path="odh-trusted-ca-bundle.crt", + ), + V1VolumeMount( + mount_path="/etc/pki/tls/certs/odh-ca-bundle.crt", + name="odh-ca-cert", + sub_path="odh-ca-bundle.crt", + ), + V1VolumeMount( + mount_path="/etc/ssl/certs/odh-ca-bundle.crt", + name="odh-ca-cert", + sub_path="odh-ca-bundle.crt", + ), +] + +VOLUMES = [ + V1Volume( + name="odh-trusted-ca-cert", + config_map=V1ConfigMapVolumeSource( + name="odh-trusted-ca-bundle", + items=[V1KeyToPath(key="ca-bundle.crt", path="odh-trusted-ca-bundle.crt")], + optional=True, + ), + ), + V1Volume( + name="odh-ca-cert", + config_map=V1ConfigMapVolumeSource( + name="odh-trusted-ca-bundle", + items=[V1KeyToPath(key="odh-ca-bundle.crt", path="odh-ca-bundle.crt")], + optional=True, + ), + ), +] + + +# RayCluster/AppWrapper builder function +def build_ray_cluster(cluster: "codeflare_sdk.cluster.Cluster"): + """ + This function is used for creating a Ray Cluster/AppWrapper + """ + ray_version = "2.23.0" + + # GPU related variables + head_gpu_count, worker_gpu_count = head_worker_gpu_count_from_cluster(cluster) + head_resources, worker_resources = head_worker_resources_from_cluster(cluster) + head_resources = f'"{head_resources}"' + head_resources = json.dumps(head_resources).replace('"', '\\"') + worker_resources = json.dumps(worker_resources).replace('"', '\\"') + worker_resources = f'"{worker_resources}"' + + # Create the Ray Cluster using the V1RayCluster Object + resource = V1RayCluster( + apiVersion="ray.io/v1", + kind="RayCluster", + metadata=get_metadata(cluster), + spec=V1RayClusterSpec( + rayVersion=ray_version, + enableInTreeAutoscaling=False, + autoscalerOptions=V1AutoScalerOptions( + upscalingMode="Default", + idleTimeoutSeconds=60, + imagePullPolicy="Always", + resources=get_resources("500m", "500m", "512Mi", "512Mi"), + ), + headGroupSpec=V1HeadGroupSpec( + serviceType="ClusterIP", + enableIngress=False, + rayStartParams={ + "dashboard-host": "0.0.0.0", + "block": "true", + "num-gpus": str(head_gpu_count), + }, + template=V1PodTemplateSpec( + spec=get_pod_spec(cluster, [get_head_container_spec(cluster)]) + ), + ), + workerGroupSpecs=[ + V1WorkerGroupSpec( + groupName=f"small-group-{cluster.config.name}", + replicas=cluster.config.num_workers, + minReplicas=cluster.config.num_workers, + maxReplicas=cluster.config.num_workers, + rayStartParams={"block": "true", "num-gpus": str(worker_gpu_count)}, + template=V1PodTemplateSpec( + spec=get_pod_spec(cluster, [get_worker_container_spec(cluster)]) + ), + ) + ], + ), + ) + config_check() + k8s_client = api_config_handler() or client.ApiClient() + + if cluster.config.appwrapper: + # Wrap the Ray Cluster in an AppWrapper + appwrapper_name, _ = gen_names(cluster.config.name) + resource = wrap_cluster(cluster, appwrapper_name, resource) + + resource = k8s_client.sanitize_for_serialization(resource) + + # write_to_file functionality + if cluster.config.write_to_file: + return write_to_file( + cluster.config.name, resource + ) # Writes the file and returns it's name + else: + print(f"Yaml resources loaded for {cluster.config.name}") + return resource # Returns the Resource as a dict + + +# Metadata related functions +def get_metadata(cluster: "codeflare_sdk.cluster.Cluster"): + """ + The get_metadata() function builds and returns a V1ObjectMeta Object + """ + object_meta = V1ObjectMeta( + name=cluster.config.name, + namespace=cluster.config.namespace, + labels=get_labels(cluster), + ) + + # Get the NB annotation if it exists - could be useful in future for a "annotations" parameter. + annotations = get_annotations() + if annotations != {}: + object_meta.annotations = annotations # As annotations are not a guarantee they are appended to the metadata after creation. + return object_meta + + +def get_labels(cluster: "codeflare_sdk.cluster.Cluster"): + """ + The get_labels() function generates a dict "labels" which includes the base label, local queue label and user defined labels + """ + labels = { + "controller-tools.k8s.io": "1.0", + } + if cluster.config.labels != {}: + labels.update(cluster.config.labels) + + if cluster.config.appwrapper is False: + add_queue_label(cluster, labels) + + return labels + + +def get_annotations(): + """ + This function generates the annotation for NB Prefix if the SDK is running in a notebook + """ + annotations = {} + + # Notebook annotation + nb_prefix = os.environ.get("NB_PREFIX") + if nb_prefix: + annotations.update({"app.kubernetes.io/managed-by": nb_prefix}) + + return annotations + + +# Head/Worker container related functions +def get_pod_spec(cluster: "codeflare_sdk.cluster.Cluster", containers): + """ + The get_pod_spec() function generates a V1PodSpec for the head/worker containers + """ + pod_spec = V1PodSpec( + containers=containers, + volumes=VOLUMES, + ) + if cluster.config.image_pull_secrets != []: + pod_spec.image_pull_secrets = generate_image_pull_secrets(cluster) + + return pod_spec + + +def generate_image_pull_secrets(cluster: "codeflare_sdk.cluster.Cluster"): + """ + The generate_image_pull_secrets() methods generates a list of V1LocalObjectReference including each of the specified image pull secrets + """ + pull_secrets = [] + for pull_secret in cluster.config.image_pull_secrets: + pull_secrets.append(V1LocalObjectReference(name=pull_secret)) + + return pull_secrets + + +def get_head_container_spec( + cluster: "codeflare_sdk.cluster.Cluster", +): + """ + The get_head_container_spec() function builds and returns a V1Container object including user defined resource requests/limits + """ + head_container = V1Container( + name="ray-head", + image=cluster.config.image, + image_pull_policy="Always", + ports=[ + V1ContainerPort(name="gcs", container_port=6379), + V1ContainerPort(name="dashboard", container_port=8265), + V1ContainerPort(name="client", container_port=10001), + ], + lifecycle=V1Lifecycle( + pre_stop=V1LifecycleHandler( + _exec=V1ExecAction(["/bin/sh", "-c", "ray stop"]) + ) + ), + resources=get_resources( + cluster.config.head_cpus, # TODO Update the head cpu/memory requests/limits - https://github.com/project-codeflare/codeflare-sdk/pull/579 + cluster.config.head_cpus, + cluster.config.head_memory, + cluster.config.head_memory, + cluster.config.head_extended_resource_requests, + ), + volume_mounts=VOLUME_MOUNTS, + ) + if cluster.config.envs != {}: + head_container.env = generate_env_vars(cluster) + + return head_container + + +def generate_env_vars(cluster: "codeflare_sdk.cluster.Cluster"): + """ + The generate_env_vars() builds and returns a V1EnvVar object which is populated by user specified environment variables + """ + envs = [] + for key, value in cluster.config.envs.items(): + env_var = V1EnvVar(name=key, value=value) + envs.append(env_var) + + return envs + + +def get_worker_container_spec( + cluster: "codeflare_sdk.cluster.Cluster", +): + """ + The get_worker_container_spec() function builds and returns a V1Container object including user defined resource requests/limits + """ + worker_container = V1Container( + name="machine-learning", + image=cluster.config.image, + image_pull_policy="Always", + lifecycle=V1Lifecycle( + pre_stop=V1LifecycleHandler( + _exec=V1ExecAction(["/bin/sh", "-c", "ray stop"]) + ) + ), + resources=get_resources( + cluster.config.worker_cpu_requests, + cluster.config.worker_cpu_limits, + cluster.config.worker_memory_requests, + cluster.config.worker_memory_limits, + cluster.config.worker_extended_resource_requests, + ), + volume_mounts=VOLUME_MOUNTS, + ) + + return worker_container + + +def get_resources( + cpu_requests: Union[int, str], + cpu_limits: Union[int, str], + memory_requests: Union[int, str], + memory_limits: Union[int, str], + custom_extended_resource_requests: Dict[str, int] = None, +): + """ + The get_resources() function generates a V1ResourceRequirements object for cpu/memory request/limits and GPU resources + """ + resource_requirements = V1ResourceRequirements( + requests={"cpu": cpu_requests, "memory": memory_requests}, + limits={"cpu": cpu_limits, "memory": memory_limits}, + ) + + # Append the resource/limit requests with custom extended resources + if custom_extended_resource_requests is not None: + for k in custom_extended_resource_requests.keys(): + resource_requirements.limits[k] = custom_extended_resource_requests[k] + resource_requirements.requests[k] = custom_extended_resource_requests[k] + + return resource_requirements + + +# GPU related functions +def head_worker_gpu_count_from_cluster( + cluster: "codeflare_sdk.cluster.Cluster", +) -> Tuple[int, int]: + """ + The head_worker_gpu_count_from_cluster() function returns the total number of requested GPUs for the head and worker seperately + """ + head_gpus = 0 + worker_gpus = 0 + for k in cluster.config.head_extended_resource_requests.keys(): + resource_type = cluster.config.extended_resource_mapping[k] + if resource_type == "GPU": + head_gpus += int(cluster.config.head_extended_resource_requests[k]) + for k in cluster.config.worker_extended_resource_requests.keys(): + resource_type = cluster.config.extended_resource_mapping[k] + if resource_type == "GPU": + worker_gpus += int(cluster.config.worker_extended_resource_requests[k]) + + return head_gpus, worker_gpus + + +def head_worker_resources_from_cluster( + cluster: "codeflare_sdk.cluster.Cluster", +) -> Tuple[dict, dict]: + """ + The head_worker_resources_from_cluster() function returns 2 dicts for head/worker respectively populated by the GPU type requested by the user + """ + to_return = {}, {} + for k in cluster.config.head_extended_resource_requests.keys(): + resource_type = cluster.config.extended_resource_mapping[k] + if resource_type in FORBIDDEN_CUSTOM_RESOURCE_TYPES: + continue + to_return[0][resource_type] = cluster.config.head_extended_resource_requests[ + k + ] + to_return[0].get(resource_type, 0) + + for k in cluster.config.worker_extended_resource_requests.keys(): + resource_type = cluster.config.extended_resource_mapping[k] + if resource_type in FORBIDDEN_CUSTOM_RESOURCE_TYPES: + continue + to_return[1][resource_type] = cluster.config.worker_extended_resource_requests[ + k + ] + to_return[1].get(resource_type, 0) + return to_return + + +# Local Queue related functions +def add_queue_label(cluster: "codeflare_sdk.cluster.Cluster", labels: dict): + """ + The add_queue_label() function updates the given base labels with the local queue label if Kueue exists on the Cluster + """ + lq_name = cluster.config.local_queue or get_default_local_queue(cluster, labels) + if lq_name == None: + return + elif not local_queue_exists(cluster): + raise ValueError( + "local_queue provided does not exist or is not in this namespace. Please provide the correct local_queue name in Cluster Configuration" + ) + labels.update({"kueue.x-k8s.io/queue-name": lq_name}) + + +def local_queue_exists(cluster: "codeflare_sdk.cluster.Cluster"): + """ + The local_queue_exists() checks if the user inputted local_queue exists in the given namespace and returns a bool + """ + # get all local queues in the namespace + try: + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + local_queues = api_instance.list_namespaced_custom_object( + group="kueue.x-k8s.io", + version="v1beta1", + namespace=cluster.config.namespace, + plural="localqueues", + ) + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) + # check if local queue with the name provided in cluster config exists + for lq in local_queues["items"]: + if lq["metadata"]["name"] == cluster.config.local_queue: + return True + return False + + +def get_default_local_queue(cluster: "codeflare_sdk.cluster.Cluster", labels: dict): + """ + The get_default_local_queue() function attempts to find a local queue with the default label == true, if that is the case the labels variable is updated with that local queue + """ + try: + # Try to get the default local queue if it exists and append the label list + config_check() + api_instance = client.CustomObjectsApi(api_config_handler()) + local_queues = api_instance.list_namespaced_custom_object( + group="kueue.x-k8s.io", + version="v1beta1", + namespace=cluster.config.namespace, + plural="localqueues", + ) + except ApiException as e: # pragma: no cover + if e.status == 404 or e.status == 403: + return + else: + return _kube_api_error_handling(e) + + for lq in local_queues["items"]: + if ( + "annotations" in lq["metadata"] + and "kueue.x-k8s.io/default-queue" in lq["metadata"]["annotations"] + and lq["metadata"]["annotations"]["kueue.x-k8s.io/default-queue"].lower() + == "true" + ): + labels.update({"kueue.x-k8s.io/queue-name": lq["metadata"]["name"]}) + + +# AppWrapper related functions +def wrap_cluster( + cluster: "codeflare_sdk.cluster.Cluster", + appwrapper_name: str, + ray_cluster_yaml: dict, +): + """ + Wraps the pre-built Ray Cluster dict in an AppWrapper + """ + wrapping = { + "apiVersion": "workload.codeflare.dev/v1beta2", + "kind": "AppWrapper", + "metadata": {"name": appwrapper_name, "namespace": cluster.config.namespace}, + "spec": {"components": [{"template": ray_cluster_yaml}]}, + } + # Add local queue label if it is necessary + labels = {} + add_queue_label(cluster, labels) + if labels != None: + wrapping["metadata"]["labels"] = labels + + return wrapping + + +# Etc. +def write_to_file(cluster_name: str, resource: dict): + directory_path = os.path.expanduser("~/.codeflare/resources/") + output_file_name = os.path.join(directory_path, cluster_name + ".yaml") + + directory_path = os.path.dirname(output_file_name) + if not os.path.exists(directory_path): + os.makedirs(directory_path) + + with open(output_file_name, "w") as outfile: + yaml.dump(resource, outfile, default_flow_style=False) + + print(f"Written to: {output_file_name}") + return output_file_name + + +def gen_names(name): + if not name: + gen_id = str(uuid.uuid4()) + appwrapper_name = "appwrapper-" + gen_id + cluster_name = "cluster-" + gen_id + return appwrapper_name, cluster_name + else: + return name, name diff --git a/src/codeflare_sdk/utils/generate_yaml.py b/src/codeflare_sdk/utils/generate_yaml.py deleted file mode 100755 index 1644dc15e..000000000 --- a/src/codeflare_sdk/utils/generate_yaml.py +++ /dev/null @@ -1,356 +0,0 @@ -# Copyright 2022 IBM, Red Hat -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -This sub-module exists primarily to be used internally by the Cluster object -(in the cluster sub-module) for AppWrapper generation. -""" - -import json -from typing import Optional -import typing -import yaml -import sys -import os -import argparse -import uuid -from kubernetes import client, config -from .kube_api_helpers import _kube_api_error_handling -from ..cluster.auth import api_config_handler, config_check -from os import urandom -from base64 import b64encode -from urllib3.util import parse_url -from kubernetes.client.exceptions import ApiException -import codeflare_sdk - - -def read_template(template): - with open(template, "r") as stream: - try: - return yaml.safe_load(stream) - except yaml.YAMLError as exc: - print(exc) - - -def gen_names(name): - if not name: - gen_id = str(uuid.uuid4()) - appwrapper_name = "appwrapper-" + gen_id - cluster_name = "cluster-" + gen_id - return appwrapper_name, cluster_name - else: - return name, name - - -# Check if the routes api exists -def is_openshift_cluster(): - try: - config_check() - for api in client.ApisApi(api_config_handler()).get_api_versions().groups: - for v in api.versions: - if "route.openshift.io/v1" in v.group_version: - return True - else: - return False - except Exception as e: # pragma: no cover - return _kube_api_error_handling(e) - - -def is_kind_cluster(): - try: - config_check() - v1 = client.CoreV1Api() - label_selector = "kubernetes.io/hostname=kind-control-plane" - nodes = v1.list_node(label_selector=label_selector) - # If we find one or more nodes with the label, assume it's a KinD cluster - return len(nodes.items) > 0 - except Exception as e: - print(f"Error checking if cluster is KinD: {e}") - return False - - -def update_names( - cluster_yaml: dict, - cluster: "codeflare_sdk.cluster.Cluster", -): - metadata = cluster_yaml.get("metadata") - metadata["name"] = cluster.config.name - metadata["namespace"] = cluster.config.namespace - - -def update_image(spec, image): - containers = spec.get("containers") - if image != "": - for container in containers: - container["image"] = image - - -def update_image_pull_secrets(spec, image_pull_secrets): - template_secrets = spec.get("imagePullSecrets", []) - spec["imagePullSecrets"] = template_secrets + [ - {"name": x} for x in image_pull_secrets - ] - - -def update_env(spec, env): - containers = spec.get("containers") - for container in containers: - if env: - if "env" in container: - container["env"].extend(env) - else: - container["env"] = env - - -def update_resources( - spec, - worker_cpu_requests, - worker_cpu_limits, - worker_memory_requests, - worker_memory_limits, - custom_resources, -): - container = spec.get("containers") - for resource in container: - requests = resource.get("resources").get("requests") - if requests is not None: - requests["cpu"] = worker_cpu_requests - requests["memory"] = worker_memory_requests - limits = resource.get("resources").get("limits") - if limits is not None: - limits["cpu"] = worker_cpu_limits - limits["memory"] = worker_memory_limits - for k in custom_resources.keys(): - limits[k] = custom_resources[k] - requests[k] = custom_resources[k] - - -def head_worker_gpu_count_from_cluster( - cluster: "codeflare_sdk.cluster.Cluster", -) -> typing.Tuple[int, int]: - head_gpus = 0 - worker_gpus = 0 - for k in cluster.config.head_extended_resource_requests.keys(): - resource_type = cluster.config.extended_resource_mapping[k] - if resource_type == "GPU": - head_gpus += int(cluster.config.head_extended_resource_requests[k]) - for k in cluster.config.worker_extended_resource_requests.keys(): - resource_type = cluster.config.extended_resource_mapping[k] - if resource_type == "GPU": - worker_gpus += int(cluster.config.worker_extended_resource_requests[k]) - - return head_gpus, worker_gpus - - -FORBIDDEN_CUSTOM_RESOURCE_TYPES = ["GPU", "CPU", "memory"] - - -def head_worker_resources_from_cluster( - cluster: "codeflare_sdk.cluster.Cluster", -) -> typing.Tuple[dict, dict]: - to_return = {}, {} - for k in cluster.config.head_extended_resource_requests.keys(): - resource_type = cluster.config.extended_resource_mapping[k] - if resource_type in FORBIDDEN_CUSTOM_RESOURCE_TYPES: - continue - to_return[0][resource_type] = cluster.config.head_extended_resource_requests[ - k - ] + to_return[0].get(resource_type, 0) - - for k in cluster.config.worker_extended_resource_requests.keys(): - resource_type = cluster.config.extended_resource_mapping[k] - if resource_type in FORBIDDEN_CUSTOM_RESOURCE_TYPES: - continue - to_return[1][resource_type] = cluster.config.worker_extended_resource_requests[ - k - ] + to_return[1].get(resource_type, 0) - return to_return - - -def update_nodes( - ray_cluster_dict: dict, - cluster: "codeflare_sdk.cluster.Cluster", -): - head = ray_cluster_dict.get("spec").get("headGroupSpec") - worker = ray_cluster_dict.get("spec").get("workerGroupSpecs")[0] - head_gpus, worker_gpus = head_worker_gpu_count_from_cluster(cluster) - head_resources, worker_resources = head_worker_resources_from_cluster(cluster) - head_resources = json.dumps(head_resources).replace('"', '\\"') - head_resources = f'"{head_resources}"' - worker_resources = json.dumps(worker_resources).replace('"', '\\"') - worker_resources = f'"{worker_resources}"' - head["rayStartParams"]["num-gpus"] = str(head_gpus) - head["rayStartParams"]["resources"] = head_resources - - # Head counts as first worker - worker["replicas"] = cluster.config.num_workers - worker["minReplicas"] = cluster.config.num_workers - worker["maxReplicas"] = cluster.config.num_workers - worker["groupName"] = "small-group-" + cluster.config.name - worker["rayStartParams"]["num-gpus"] = str(worker_gpus) - worker["rayStartParams"]["resources"] = worker_resources - - for comp in [head, worker]: - spec = comp.get("template").get("spec") - update_image_pull_secrets(spec, cluster.config.image_pull_secrets) - update_image(spec, cluster.config.image) - update_env(spec, cluster.config.envs) - if comp == head: - # TODO: Eventually add head node configuration outside of template - update_resources( - spec, - cluster.config.head_cpus, - cluster.config.head_cpus, - cluster.config.head_memory, - cluster.config.head_memory, - cluster.config.head_extended_resource_requests, - ) - else: - update_resources( - spec, - cluster.config.worker_cpu_requests, - cluster.config.worker_cpu_limits, - cluster.config.worker_memory_requests, - cluster.config.worker_memory_limits, - cluster.config.worker_extended_resource_requests, - ) - - -def del_from_list_by_name(l: list, target: typing.List[str]) -> list: - return [x for x in l if x["name"] not in target] - - -def get_default_kueue_name(namespace: str): - # If the local queue is set, use it. Otherwise, try to use the default queue. - try: - config_check() - api_instance = client.CustomObjectsApi(api_config_handler()) - local_queues = api_instance.list_namespaced_custom_object( - group="kueue.x-k8s.io", - version="v1beta1", - namespace=namespace, - plural="localqueues", - ) - except ApiException as e: # pragma: no cover - if e.status == 404 or e.status == 403: - return - else: - return _kube_api_error_handling(e) - for lq in local_queues["items"]: - if ( - "annotations" in lq["metadata"] - and "kueue.x-k8s.io/default-queue" in lq["metadata"]["annotations"] - and lq["metadata"]["annotations"]["kueue.x-k8s.io/default-queue"].lower() - == "true" - ): - return lq["metadata"]["name"] - - -def local_queue_exists(namespace: str, local_queue_name: str): - # get all local queues in the namespace - try: - config_check() - api_instance = client.CustomObjectsApi(api_config_handler()) - local_queues = api_instance.list_namespaced_custom_object( - group="kueue.x-k8s.io", - version="v1beta1", - namespace=namespace, - plural="localqueues", - ) - except Exception as e: # pragma: no cover - return _kube_api_error_handling(e) - # check if local queue with the name provided in cluster config exists - for lq in local_queues["items"]: - if lq["metadata"]["name"] == local_queue_name: - return True - return False - - -def add_queue_label(item: dict, namespace: str, local_queue: Optional[str]): - lq_name = local_queue or get_default_kueue_name(namespace) - if lq_name == None: - return - elif not local_queue_exists(namespace, lq_name): - raise ValueError( - "local_queue provided does not exist or is not in this namespace. Please provide the correct local_queue name in Cluster Configuration" - ) - if not "labels" in item["metadata"]: - item["metadata"]["labels"] = {} - item["metadata"]["labels"].update({"kueue.x-k8s.io/queue-name": lq_name}) - - -def augment_labels(item: dict, labels: dict): - if not "labels" in item["metadata"]: - item["metadata"]["labels"] = {} - item["metadata"]["labels"].update(labels) - - -def notebook_annotations(item: dict): - nb_prefix = os.environ.get("NB_PREFIX") - if nb_prefix: - if not "annotations" in item["metadata"]: - item["metadata"]["annotations"] = {} - item["metadata"]["annotations"].update( - {"app.kubernetes.io/managed-by": nb_prefix} - ) - - -def wrap_cluster(cluster_yaml: dict, appwrapper_name: str, namespace: str): - return { - "apiVersion": "workload.codeflare.dev/v1beta2", - "kind": "AppWrapper", - "metadata": {"name": appwrapper_name, "namespace": namespace}, - "spec": {"components": [{"template": cluster_yaml}]}, - } - - -def write_user_yaml(user_yaml, output_file_name): - # Create the directory if it doesn't exist - directory_path = os.path.dirname(output_file_name) - if not os.path.exists(directory_path): - os.makedirs(directory_path) - - with open(output_file_name, "w") as outfile: - yaml.dump(user_yaml, outfile, default_flow_style=False) - - print(f"Written to: {output_file_name}") - - -def generate_appwrapper(cluster: "codeflare_sdk.cluster.Cluster"): - cluster_yaml = read_template(cluster.config.template) - appwrapper_name, _ = gen_names(cluster.config.name) - update_names( - cluster_yaml, - cluster, - ) - update_nodes(cluster_yaml, cluster) - augment_labels(cluster_yaml, cluster.config.labels) - notebook_annotations(cluster_yaml) - user_yaml = ( - wrap_cluster(cluster_yaml, appwrapper_name, cluster.config.namespace) - if cluster.config.appwrapper - else cluster_yaml - ) - - add_queue_label(user_yaml, cluster.config.namespace, cluster.config.local_queue) - - if cluster.config.write_to_file: - directory_path = os.path.expanduser("~/.codeflare/resources/") - outfile = os.path.join(directory_path, appwrapper_name + ".yaml") - write_user_yaml(user_yaml, outfile) - return outfile - else: - user_yaml = yaml.dump(user_yaml) - print(f"Yaml resources loaded for {cluster.config.name}") - return user_yaml diff --git a/tests/test-case-custom-image.yaml b/tests/test-case-custom-image.yaml index 8a417a581..be1223ed8 100644 --- a/tests/test-case-custom-image.yaml +++ b/tests/test-case-custom-image.yaml @@ -29,7 +29,6 @@ spec: block: 'true' dashboard-host: 0.0.0.0 num-gpus: '0' - resources: '"{}"' serviceType: ClusterIP template: spec: @@ -96,17 +95,12 @@ spec: rayStartParams: block: 'true' num-gpus: '7' - resources: '"{}"' replicas: 2 template: - metadata: - annotations: - key: value - labels: - key: value spec: containers: - image: quay.io/project-codeflare/ray:2.20.0-py39-cu118 + imagePullPolicy: Always lifecycle: preStop: exec: diff --git a/tests/test-case-no-kueue-no-aw.yaml b/tests/test-case-no-kueue-no-aw.yaml index 3ea7a22d1..b555b9037 100644 --- a/tests/test-case-no-kueue-no-aw.yaml +++ b/tests/test-case-no-kueue-no-aw.yaml @@ -26,7 +26,6 @@ spec: block: 'true' dashboard-host: 0.0.0.0 num-gpus: '0' - resources: '"{}"' serviceType: ClusterIP template: spec: @@ -93,17 +92,12 @@ spec: rayStartParams: block: 'true' num-gpus: '7' - resources: '"{}"' replicas: 2 template: - metadata: - annotations: - key: value - labels: - key: value spec: containers: - image: quay.io/rhoai/ray:2.23.0-py39-cu121 + imagePullPolicy: Always lifecycle: preStop: exec: diff --git a/tests/test-case-no-mcad.yamls b/tests/test-case-no-mcad.yamls index 45a3dfb9f..4d6a2e71e 100644 --- a/tests/test-case-no-mcad.yamls +++ b/tests/test-case-no-mcad.yamls @@ -29,7 +29,6 @@ spec: block: 'true' dashboard-host: 0.0.0.0 num-gpus: '0' - resources: '"{}"' serviceType: ClusterIP template: spec: @@ -96,17 +95,12 @@ spec: rayStartParams: block: 'true' num-gpus: '7' - resources: '"{}"' replicas: 2 template: - metadata: - annotations: - key: value - labels: - key: value spec: containers: - image: quay.io/rhoai/ray:2.23.0-py39-cu121 + imagePullPolicy: Always lifecycle: preStop: exec: diff --git a/tests/test-case.yaml b/tests/test-case.yaml index 461ed7dfc..e93536a0a 100644 --- a/tests/test-case.yaml +++ b/tests/test-case.yaml @@ -34,7 +34,6 @@ spec: block: 'true' dashboard-host: 0.0.0.0 num-gpus: '0' - resources: '"{}"' serviceType: ClusterIP template: spec: @@ -101,17 +100,12 @@ spec: rayStartParams: block: 'true' num-gpus: '7' - resources: '"{}"' replicas: 2 template: - metadata: - annotations: - key: value - labels: - key: value spec: containers: - image: quay.io/rhoai/ray:2.23.0-py39-cu121 + imagePullPolicy: Always lifecycle: preStop: exec: diff --git a/tests/test-default-appwrapper.yaml b/tests/test-default-appwrapper.yaml index cc27e37a1..10f2e549b 100644 --- a/tests/test-default-appwrapper.yaml +++ b/tests/test-default-appwrapper.yaml @@ -34,11 +34,9 @@ spec: block: 'true' dashboard-host: 0.0.0.0 num-gpus: '0' - resources: '"{}"' serviceType: ClusterIP template: spec: - imagePullSecrets: [] containers: - image: quay.io/rhoai/ray:2.23.0-py39-cu121 imagePullPolicy: Always @@ -100,18 +98,12 @@ spec: rayStartParams: block: 'true' num-gpus: '0' - resources: '"{}"' replicas: 1 template: - metadata: - annotations: - key: value - labels: - key: value spec: - imagePullSecrets: [] containers: - image: quay.io/rhoai/ray:2.23.0-py39-cu121 + imagePullPolicy: Always lifecycle: preStop: exec: diff --git a/tests/unit_test.py b/tests/unit_test.py index ba937d873..762f9358b 100644 --- a/tests/unit_test.py +++ b/tests/unit_test.py @@ -38,6 +38,7 @@ get_cluster, _app_wrapper_status, _ray_cluster_status, + _is_openshift_cluster, ) from codeflare_sdk.cluster.auth import ( TokenAuthentication, @@ -71,12 +72,12 @@ get_package_and_version, ) -import codeflare_sdk.utils.kube_api_helpers -from codeflare_sdk.utils.generate_yaml import ( +from codeflare_sdk.utils.build_ray_cluster import ( gen_names, - is_openshift_cluster, ) +import codeflare_sdk.utils.kube_api_helpers + import openshift from openshift.selector import Selector import ray @@ -263,8 +264,6 @@ def test_config_creation(): assert config.worker_cpu_requests == 3 and config.worker_cpu_limits == 4 assert config.worker_memory_requests == "5G" and config.worker_memory_limits == "6G" assert config.worker_extended_resource_requests == {"nvidia.com/gpu": 7} - assert config.template == f"{parent}/src/codeflare_sdk/templates/base-template.yaml" - assert config.machine_types == ["cpu.small", "gpu.large"] assert config.image_pull_secrets == ["unit-test-pull-secret"] assert config.appwrapper == True @@ -282,8 +281,8 @@ def test_cluster_creation(mocker): return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) cluster = createClusterWithConfig(mocker) - assert cluster.app_wrapper_yaml == f"{aw_dir}unit-test-cluster.yaml" - assert cluster.app_wrapper_name == "unit-test-cluster" + assert cluster.resource_yaml == f"{aw_dir}unit-test-cluster.yaml" + assert cluster.config.name == "unit-test-cluster" assert filecmp.cmp( f"{aw_dir}unit-test-cluster.yaml", f"{parent}/tests/test-case.yaml", @@ -304,7 +303,7 @@ def test_cluster_no_kueue_no_aw(mocker): config.name = "unit-test-no-kueue" config.write_to_file = True cluster = Cluster(config) - assert cluster.app_wrapper_yaml == f"{aw_dir}unit-test-no-kueue.yaml" + assert cluster.resource_yaml == f"{aw_dir}unit-test-no-kueue.yaml" assert cluster.config.local_queue == None assert filecmp.cmp( f"{aw_dir}unit-test-no-kueue.yaml", @@ -367,8 +366,8 @@ def test_cluster_creation_no_mcad(mocker): config.appwrapper = False cluster = Cluster(config) - assert cluster.app_wrapper_yaml == f"{aw_dir}unit-test-cluster-ray.yaml" - assert cluster.app_wrapper_name == "unit-test-cluster-ray" + assert cluster.resource_yaml == f"{aw_dir}unit-test-cluster-ray.yaml" + assert cluster.config.name == "unit-test-cluster-ray" assert filecmp.cmp( f"{aw_dir}unit-test-cluster-ray.yaml", f"{parent}/tests/test-case-no-mcad.yamls", @@ -396,8 +395,8 @@ def test_cluster_creation_no_mcad_local_queue(mocker): config.local_queue = "local-queue-default" config.labels = {"testlabel": "test", "testlabel2": "test"} cluster = Cluster(config) - assert cluster.app_wrapper_yaml == f"{aw_dir}unit-test-cluster-ray.yaml" - assert cluster.app_wrapper_name == "unit-test-cluster-ray" + assert cluster.resource_yaml == f"{aw_dir}unit-test-cluster-ray.yaml" + assert cluster.config.name == "unit-test-cluster-ray" assert filecmp.cmp( f"{aw_dir}unit-test-cluster-ray.yaml", f"{parent}/tests/test-case-no-mcad.yamls", @@ -413,7 +412,6 @@ def test_cluster_creation_no_mcad_local_queue(mocker): worker_memory_requests=5, worker_memory_limits=6, worker_extended_resource_requests={"nvidia.com/gpu": 7}, - machine_types=["cpu.small", "gpu.large"], image_pull_secrets=["unit-test-pull-secret"], write_to_file=True, appwrapper=False, @@ -421,8 +419,8 @@ def test_cluster_creation_no_mcad_local_queue(mocker): labels={"testlabel": "test", "testlabel2": "test"}, ) cluster = Cluster(config) - assert cluster.app_wrapper_yaml == f"{aw_dir}unit-test-cluster-ray.yaml" - assert cluster.app_wrapper_name == "unit-test-cluster-ray" + assert cluster.resource_yaml == f"{aw_dir}unit-test-cluster-ray.yaml" + assert cluster.config.name == "unit-test-cluster-ray" assert filecmp.cmp( f"{aw_dir}unit-test-cluster-ray.yaml", f"{parent}/tests/test-case-no-mcad.yamls", @@ -445,15 +443,14 @@ def test_default_cluster_creation(mocker): appwrapper=True, ) cluster = Cluster(default_config) - test_aw = yaml.load(cluster.app_wrapper_yaml, Loader=yaml.FullLoader) with open( f"{parent}/tests/test-default-appwrapper.yaml", ) as f: default_aw = yaml.load(f, Loader=yaml.FullLoader) - assert test_aw == default_aw + assert cluster.resource_yaml == default_aw - assert cluster.app_wrapper_name == "unit-test-default-cluster" + assert cluster.config.name == "unit-test-default-cluster" assert cluster.config.namespace == "opendatahub" @@ -477,8 +474,8 @@ def test_cluster_creation_with_custom_image(mocker): config.local_queue = "local-queue-default" config.labels = {"testlabel": "test", "testlabel2": "test"} cluster = Cluster(config) - assert cluster.app_wrapper_yaml == f"{aw_dir}unit-test-cluster-custom-image.yaml" - assert cluster.app_wrapper_name == "unit-test-cluster-custom-image" + assert cluster.resource_yaml == f"{aw_dir}unit-test-cluster-custom-image.yaml" + assert cluster.config.name == "unit-test-cluster-custom-image" assert filecmp.cmp( f"{aw_dir}unit-test-cluster-custom-image.yaml", f"{parent}/tests/test-case-custom-image.yaml", @@ -494,7 +491,6 @@ def test_cluster_creation_with_custom_image(mocker): worker_memory_requests=5, worker_memory_limits=6, worker_extended_resource_requests={"nvidia.com/gpu": 7}, - machine_types=["cpu.small", "gpu.large"], image_pull_secrets=["unit-test-pull-secret"], image="quay.io/project-codeflare/ray:2.20.0-py39-cu118", write_to_file=True, @@ -503,8 +499,8 @@ def test_cluster_creation_with_custom_image(mocker): labels={"testlabel": "test", "testlabel2": "test"}, ) cluster = Cluster(config) - assert cluster.app_wrapper_yaml == f"{aw_dir}unit-test-cluster-custom-image.yaml" - assert cluster.app_wrapper_name == "unit-test-cluster-custom-image" + assert cluster.resource_yaml == f"{aw_dir}unit-test-cluster-custom-image.yaml" + assert cluster.config.name == "unit-test-cluster-custom-image" assert filecmp.cmp( f"{aw_dir}unit-test-cluster-custom-image.yaml", f"{parent}/tests/test-case-custom-image.yaml", @@ -759,7 +755,7 @@ def test_local_client_url(mocker): return_value="rayclient-unit-test-cluster-localinter-ns.apps.cluster.awsroute.org", ) mocker.patch( - "codeflare_sdk.cluster.cluster.Cluster.create_app_wrapper", + "codeflare_sdk.cluster.cluster.Cluster.create_resource", return_value="unit-test-cluster-localinter.yaml", ) @@ -955,7 +951,7 @@ def test_ray_details(mocker, capsys): return_value="", ) mocker.patch( - "codeflare_sdk.utils.generate_yaml.local_queue_exists", + "codeflare_sdk.utils.build_ray_cluster.local_queue_exists", return_value="true", ) cf = Cluster( @@ -1984,156 +1980,53 @@ def route_list_retrieval(group, version, namespace, plural): } -def test_get_cluster_openshift(mocker): +def test_get_appwrapper(mocker): + mocker.patch("kubernetes.client.ApisApi.get_api_versions") mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") - # Mock the client.ApisApi function to return a mock object - mock_api = MagicMock() - mock_api.get_api_versions.return_value.groups = [ - MagicMock(versions=[MagicMock(group_version="route.openshift.io/v1")]) - ] - mocker.patch("kubernetes.client.ApisApi", return_value=mock_api) - mocker.patch( - "codeflare_sdk.utils.generate_yaml.local_queue_exists", - return_value="true", - ) - assert is_openshift_cluster() - - def custom_side_effect(group, version, namespace, plural, **kwargs): - if plural == "routes": - return route_list_retrieval("route.openshift.io", "v1", "ns", "routes") - elif plural == "rayclusters": - return get_ray_obj("ray.io", "v1", "ns", "rayclusters") - elif plural == "appwrappers": - return get_aw_obj("workload.codeflare.dev", "v1beta2", "ns", "appwrappers") - elif plural == "localqueues": - return get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues") - - mocker.patch( - "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", get_aw_obj - ) + with open( + f"{parent}/tests/test-case.yaml", + ) as f: + default_aw = yaml.load(f, Loader=yaml.FullLoader) + mocker.patch( + "kubernetes.client.CustomObjectsApi.get_namespaced_custom_object", + return_value=default_aw, + ) - mocker.patch( - "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", - side_effect=custom_side_effect, - ) - mocker.patch( - "kubernetes.client.CustomObjectsApi.get_namespaced_custom_object", - return_value=get_named_aw, - ) - mocker.patch( - "kubernetes.client.CustomObjectsApi.get_namespaced_custom_object", - side_effect=route_list_retrieval("route.openshift.io", "v1", "ns", "routes")[ - "items" - ], - ) - mocker.patch( - "codeflare_sdk.utils.generate_yaml.local_queue_exists", - return_value="true", + cluster = get_cluster( + "unit-test-cluster", "ns", is_appwrapper=True, write_to_file=False ) - - cluster = get_cluster("quicktest") cluster_config = cluster.config - assert cluster_config.name == "quicktest" and cluster_config.namespace == "ns" + assert cluster.resource_yaml == default_aw assert ( - "m4.xlarge" in cluster_config.machine_types - and "g4dn.xlarge" in cluster_config.machine_types + cluster_config.name == "unit-test-cluster" and cluster_config.namespace == "ns" ) - assert ( - cluster_config.worker_cpu_requests == 1 - and cluster_config.worker_cpu_limits == 1 - ) - assert ( - cluster_config.worker_memory_requests == "2G" - and cluster_config.worker_memory_limits == "2G" - ) - assert cluster_config.worker_extended_resource_requests == {} - assert ( - cluster_config.image - == "ghcr.io/foundation-model-stack/base:ray2.1.0-py38-gpu-pytorch1.12.0cu116-20221213-193103" - ) - assert cluster_config.num_workers == 1 -def test_get_cluster(mocker): +def test_get_cluster_no_appwrapper(mocker): mocker.patch("kubernetes.client.ApisApi.get_api_versions") mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") - mocker.patch( - "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", - side_effect=get_ray_obj, - ) - mocker.patch( - "kubernetes.client.CustomObjectsApi.get_namespaced_custom_object", - side_effect=get_named_aw, - ) - mocker.patch( - "kubernetes.client.NetworkingV1Api.list_namespaced_ingress", - return_value=ingress_retrieval(cluster_name="quicktest", client_ing=True), - ) - mocker.patch( - "codeflare_sdk.utils.generate_yaml.local_queue_exists", - return_value="true", - ) - cluster = get_cluster("quicktest") - cluster_config = cluster.config - assert cluster_config.name == "quicktest" and cluster_config.namespace == "ns" - assert ( - "m4.xlarge" in cluster_config.machine_types - and "g4dn.xlarge" in cluster_config.machine_types - ) - assert ( - cluster_config.worker_cpu_requests == 1 - and cluster_config.worker_cpu_limits == 1 - ) - assert ( - cluster_config.worker_memory_requests == "2G" - and cluster_config.worker_memory_limits == "2G" - ) - assert cluster_config.worker_extended_resource_requests == {} - assert ( - cluster_config.image - == "ghcr.io/foundation-model-stack/base:ray2.1.0-py38-gpu-pytorch1.12.0cu116-20221213-193103" - ) - assert cluster_config.num_workers == 1 + with open( + f"{parent}/tests/test-case-no-mcad.yamls", + ) as f: + default_rc = yaml.load(f, Loader=yaml.FullLoader) + mocker.patch( + "kubernetes.client.CustomObjectsApi.get_namespaced_custom_object", + return_value=default_rc, + ) -def test_get_cluster_no_mcad(mocker): - mocker.patch("kubernetes.client.ApisApi.get_api_versions") - mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") - mocker.patch( - "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", - side_effect=get_ray_obj, - ) - mocker.patch( - "kubernetes.client.NetworkingV1Api.list_namespaced_ingress", - return_value=ingress_retrieval(cluster_name="quicktest", client_ing=True), - ) - mocker.patch( - "codeflare_sdk.utils.generate_yaml.local_queue_exists", - return_value="true", - ) - cluster = get_cluster("quicktest") + cluster = get_cluster("unit-test-cluster-ray", "ns", write_to_file=True) cluster_config = cluster.config - assert cluster_config.name == "quicktest" and cluster_config.namespace == "ns" - assert ( - "m4.xlarge" in cluster_config.machine_types - and "g4dn.xlarge" in cluster_config.machine_types - ) - assert ( - cluster_config.worker_cpu_requests == 1 - and cluster_config.worker_cpu_limits == 1 - ) assert ( - cluster_config.worker_memory_requests == "2G" - and cluster_config.worker_memory_limits == "2G" + cluster_config.name == "unit-test-cluster-ray" + and cluster_config.namespace == "ns" ) - assert cluster_config.worker_extended_resource_requests == {} - assert ( - cluster_config.image - == "ghcr.io/foundation-model-stack/base:ray2.1.0-py38-gpu-pytorch1.12.0cu116-20221213-193103" + assert filecmp.cmp( + f"{aw_dir}unit-test-cluster-ray.yaml", + f"{parent}/tests/test-case-no-mcad.yamls", + shallow=True, ) - assert cluster_config.num_workers == 1 - assert cluster_config.local_queue == "team-a-queue" def route_retrieval(group, version, namespace, plural, name): @@ -2158,7 +2051,7 @@ def test_map_to_ray_cluster(mocker): mocker.patch("kubernetes.config.load_kube_config") mocker.patch( - "codeflare_sdk.cluster.cluster.is_openshift_cluster", return_value=True + "codeflare_sdk.cluster.cluster._is_openshift_cluster", return_value=True ) mock_api_client = mocker.MagicMock(spec=client.ApiClient) @@ -2305,7 +2198,7 @@ def test_list_queue_rayclusters(mocker, capsys): ] mocker.patch("kubernetes.client.ApisApi", return_value=mock_api) - assert is_openshift_cluster() == True + assert _is_openshift_cluster() == True mocker.patch( "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", side_effect=get_obj_none, @@ -2344,7 +2237,7 @@ def test_cluster_status(mocker): mocker.patch("kubernetes.client.ApisApi.get_api_versions") mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") mocker.patch( - "codeflare_sdk.utils.generate_yaml.local_queue_exists", + "codeflare_sdk.utils.build_ray_cluster.local_queue_exists", return_value="true", ) fake_aw = AppWrapper("test", AppWrapperStatus.FAILED) @@ -2436,7 +2329,7 @@ def test_wait_ready(mocker, capsys): mocker.patch("codeflare_sdk.cluster.cluster._app_wrapper_status", return_value=None) mocker.patch("codeflare_sdk.cluster.cluster._ray_cluster_status", return_value=None) mocker.patch( - "codeflare_sdk.utils.generate_yaml.local_queue_exists", + "codeflare_sdk.utils.build_ray_cluster.local_queue_exists", return_value="true", ) mocker.patch.object( @@ -2664,11 +2557,11 @@ def test_cluster_throw_for_no_raycluster(mocker: MockerFixture): return_value="opendatahub", ) mocker.patch( - "codeflare_sdk.utils.generate_yaml.get_default_kueue_name", + "codeflare_sdk.utils.build_ray_cluster.get_default_local_queue", return_value="default", ) mocker.patch( - "codeflare_sdk.utils.generate_yaml.local_queue_exists", + "codeflare_sdk.utils.build_ray_cluster.local_queue_exists", return_value="true", ) diff --git a/tests/unit_test_support.py b/tests/unit_test_support.py index dd8c2fceb..61a3180f2 100644 --- a/tests/unit_test_support.py +++ b/tests/unit_test_support.py @@ -15,7 +15,6 @@ def createClusterConfig(): worker_memory_limits=6, worker_extended_resource_requests={"nvidia.com/gpu": 7}, appwrapper=True, - machine_types=["cpu.small", "gpu.large"], image_pull_secrets=["unit-test-pull-secret"], write_to_file=True, )