Skip to content

Commit

Permalink
simplify function calls and add option for custom resources
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin <[email protected]>
  • Loading branch information
KPostOffice authored and openshift-merge-bot[bot] committed Jul 9, 2024
1 parent 2a85469 commit 5ce0b2c
Show file tree
Hide file tree
Showing 19 changed files with 289 additions and 205 deletions.
96 changes: 42 additions & 54 deletions src/codeflare_sdk/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from ..utils import pretty_print
from ..utils.generate_yaml import (
generate_appwrapper,
head_worker_gpu_count_from_cluster,
)
from ..utils.kube_api_helpers import _kube_api_error_handling
from ..utils.generate_yaml import is_openshift_cluster
Expand Down Expand Up @@ -118,48 +119,7 @@ def create_app_wrapper(self):
f"Namespace {self.config.namespace} is of type {type(self.config.namespace)}. Check your Kubernetes Authentication."
)

# Before attempting to create the cluster AW, let's evaluate the ClusterConfig

name = self.config.name
namespace = self.config.namespace
head_cpus = self.config.head_cpus
head_memory = self.config.head_memory
num_head_gpus = self.config.num_head_gpus
worker_cpu_requests = self.config.worker_cpu_requests
worker_cpu_limits = self.config.worker_cpu_limits
worker_memory_requests = self.config.worker_memory_requests
worker_memory_limits = self.config.worker_memory_limits
num_worker_gpus = self.config.num_worker_gpus
workers = self.config.num_workers
template = self.config.template
image = self.config.image
appwrapper = self.config.appwrapper
env = self.config.envs
image_pull_secrets = self.config.image_pull_secrets
write_to_file = self.config.write_to_file
local_queue = self.config.local_queue
labels = self.config.labels
return generate_appwrapper(
name=name,
namespace=namespace,
head_cpus=head_cpus,
head_memory=head_memory,
num_head_gpus=num_head_gpus,
worker_cpu_requests=worker_cpu_requests,
worker_cpu_limits=worker_cpu_limits,
worker_memory_requests=worker_memory_requests,
worker_memory_limits=worker_memory_limits,
num_worker_gpus=num_worker_gpus,
workers=workers,
template=template,
image=image,
appwrapper=appwrapper,
env=env,
image_pull_secrets=image_pull_secrets,
write_to_file=write_to_file,
local_queue=local_queue,
labels=labels,
)
return generate_appwrapper(self)

# creates a new cluster with the provided or default spec
def up(self):
Expand Down Expand Up @@ -305,7 +265,7 @@ def status(

if print_to_console:
# overriding the number of gpus with requested
cluster.worker_gpu = self.config.num_worker_gpus
_, cluster.worker_gpu = head_worker_gpu_count_from_cluster(self)
pretty_print.print_cluster_status(cluster)
elif print_to_console:
if status == CodeFlareClusterStatus.UNKNOWN:
Expand Down Expand Up @@ -443,6 +403,29 @@ def job_logs(self, job_id: str) -> str:
"""
return self.job_client.get_job_logs(job_id)

@staticmethod
def _head_worker_extended_resources_from_rc_dict(rc: Dict) -> Tuple[dict, dict]:
head_extended_resources, worker_extended_resources = {}, {}
for resource in rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][
"containers"
][0]["resources"]["limits"].keys():
if resource in ["memory", "cpu"]:
continue
worker_extended_resources[resource] = rc["spec"]["workerGroupSpecs"][0][
"template"
]["spec"]["containers"][0]["resources"]["limits"][resource]

for resource in rc["spec"]["headGroupSpec"]["template"]["spec"]["containers"][
0
]["resources"]["limits"].keys():
if resource in ["memory", "cpu"]:
continue
head_extended_resources[resource] = rc["spec"]["headGroupSpec"]["template"][
"spec"
]["containers"][0]["resources"]["limits"][resource]

return head_extended_resources, worker_extended_resources

def from_k8_cluster_object(
rc,
appwrapper=True,
Expand All @@ -456,6 +439,11 @@ def from_k8_cluster_object(
else []
)

(
head_extended_resources,
worker_extended_resources,
) = Cluster._head_worker_extended_resources_from_rc_dict(rc)

cluster_config = ClusterConfiguration(
name=rc["metadata"]["name"],
namespace=rc["metadata"]["namespace"],
Expand All @@ -473,11 +461,8 @@ def from_k8_cluster_object(
worker_memory_limits=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][
"containers"
][0]["resources"]["limits"]["memory"],
num_worker_gpus=int(
rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][0][
"resources"
]["limits"]["nvidia.com/gpu"]
),
worker_extended_resource_requests=worker_extended_resources,
head_extended_resource_requests=head_extended_resources,
image=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][
0
]["image"],
Expand Down Expand Up @@ -858,6 +843,11 @@ def _map_to_ray_cluster(rc) -> Optional[RayCluster]:
protocol = "https"
dashboard_url = f"{protocol}://{ingress.spec.rules[0].host}"

(
head_extended_resources,
worker_extended_resources,
) = Cluster._head_worker_extended_resources_from_rc_dict(rc)

return RayCluster(
name=rc["metadata"]["name"],
status=status,
Expand All @@ -872,17 +862,15 @@ def _map_to_ray_cluster(rc) -> Optional[RayCluster]:
worker_cpu=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][
0
]["resources"]["limits"]["cpu"],
worker_gpu=0, # hard to detect currently how many gpus, can override it with what the user asked for
worker_extended_resources=worker_extended_resources,
namespace=rc["metadata"]["namespace"],
head_cpus=rc["spec"]["headGroupSpec"]["template"]["spec"]["containers"][0][
"resources"
]["limits"]["cpu"],
head_mem=rc["spec"]["headGroupSpec"]["template"]["spec"]["containers"][0][
"resources"
]["limits"]["memory"],
head_gpu=rc["spec"]["headGroupSpec"]["template"]["spec"]["containers"][0][
"resources"
]["limits"]["nvidia.com/gpu"],
head_extended_resources=head_extended_resources,
dashboard=dashboard_url,
)

Expand All @@ -907,12 +895,12 @@ def _copy_to_ray(cluster: Cluster) -> RayCluster:
worker_mem_min=cluster.config.worker_memory_requests,
worker_mem_max=cluster.config.worker_memory_limits,
worker_cpu=cluster.config.worker_cpu_requests,
worker_gpu=cluster.config.num_worker_gpus,
worker_extended_resources=cluster.config.worker_extended_resource_requests,
namespace=cluster.config.namespace,
dashboard=cluster.cluster_dashboard_uri(),
head_cpus=cluster.config.head_cpus,
head_mem=cluster.config.head_memory,
head_gpu=cluster.config.num_head_gpus,
head_extended_resources=cluster.config.head_extended_resource_requests,
)
if ray.status == CodeFlareClusterStatus.READY:
ray.status = RayClusterStatus.READY
Expand Down
109 changes: 98 additions & 11 deletions src/codeflare_sdk/cluster/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,51 @@

dir = pathlib.Path(__file__).parent.parent.resolve()

# https://docs.ray.io/en/latest/ray-core/scheduling/accelerators.html
DEFAULT_RESOURCE_MAPPING = {
"nvidia.com/gpu": "GPU",
"intel.com/gpu": "GPU",
"amd.com/gpu": "GPU",
"aws.amazon.com/neuroncore": "neuron_cores",
"google.com/tpu": "TPU",
"habana.ai/gaudi": "HPU",
"huawei.com/Ascend910": "NPU",
"huawei.com/Ascend310": "NPU",
}


@dataclass
class ClusterConfiguration:
"""
This dataclass is used to specify resource requirements and other details, and
is passed in as an argument when creating a Cluster object.
Attributes:
- name: The name of the cluster.
- namespace: The namespace in which the cluster should be created.
- head_info: A list of strings containing information about the head node.
- head_cpus: The number of CPUs to allocate to the head node.
- head_memory: The amount of memory to allocate to the head node.
- head_gpus: The number of GPUs to allocate to the head node. (Deprecated, use head_extended_resource_requests)
- head_extended_resource_requests: A dictionary of extended resource requests for the head node. ex: {"nvidia.com/gpu": 1}
- machine_types: A list of machine types to use for the cluster.
- min_cpus: The minimum number of CPUs to allocate to each worker.
- max_cpus: The maximum number of CPUs to allocate to each worker.
- num_workers: The number of workers to create.
- min_memory: The minimum amount of memory to allocate to each worker.
- max_memory: The maximum amount of memory to allocate to each worker.
- num_gpus: The number of GPUs to allocate to each worker. (Deprecated, use worker_extended_resource_requests)
- template: The path to the template file to use for the cluster.
- appwrapper: A boolean indicating whether to use an AppWrapper.
- envs: A dictionary of environment variables to set for the cluster.
- image: The image to use for the cluster.
- image_pull_secrets: A list of image pull secrets to use for the cluster.
- write_to_file: A boolean indicating whether to write the cluster configuration to a file.
- verify_tls: A boolean indicating whether to verify TLS when connecting to the cluster.
- labels: A dictionary of labels to apply to the cluster.
- worker_extended_resource_requests: A dictionary of extended resource requests for each worker. ex: {"nvidia.com/gpu": 1}
- extended_resource_mapping: A dictionary of custom resource mappings to map extended resource requests to RayCluster resource names
- overwrite_default_resource_mapping: A boolean indicating whether to overwrite the default resource mapping.
"""

name: str
Expand All @@ -39,7 +78,7 @@ class ClusterConfiguration:
head_cpus: typing.Union[int, str] = 2
head_memory: typing.Union[int, str] = 8
head_gpus: int = None # Deprecating
num_head_gpus: int = 0
head_extended_resource_requests: typing.Dict[str, int] = field(default_factory=dict)
machine_types: list = field(default_factory=list) # ["m4.xlarge", "g4dn.xlarge"]
worker_cpu_requests: typing.Union[int, str] = 1
worker_cpu_limits: typing.Union[int, str] = 1
Expand All @@ -50,7 +89,6 @@ class ClusterConfiguration:
worker_memory_limits: typing.Union[int, str] = 2
min_memory: typing.Union[int, str] = None # Deprecating
max_memory: typing.Union[int, str] = None # Deprecating
num_worker_gpus: int = 0
num_gpus: int = None # Deprecating
template: str = f"{dir}/templates/base-template.yaml"
appwrapper: bool = False
Expand All @@ -60,6 +98,11 @@ class ClusterConfiguration:
write_to_file: bool = False
verify_tls: bool = True
labels: dict = field(default_factory=dict)
worker_extended_resource_requests: typing.Dict[str, int] = field(
default_factory=dict
)
extended_resource_mapping: typing.Dict[str, str] = field(default_factory=dict)
overwrite_default_resource_mapping: bool = False

def __post_init__(self):
if not self.verify_tls:
Expand All @@ -70,8 +113,60 @@ def __post_init__(self):
self._memory_to_string()
self._str_mem_no_unit_add_GB()
self._memory_to_resource()
self._gpu_to_resource()
self._cpu_to_resource()
self._gpu_to_resource()
self._combine_extended_resource_mapping()
self._validate_extended_resource_requests(self.head_extended_resource_requests)
self._validate_extended_resource_requests(
self.worker_extended_resource_requests
)

def _combine_extended_resource_mapping(self):
if overwritten := set(self.extended_resource_mapping.keys()).intersection(
DEFAULT_RESOURCE_MAPPING.keys()
):
if self.overwrite_default_resource_mapping:
warnings.warn(
f"Overwriting default resource mapping for {overwritten}",
UserWarning,
)
else:
raise ValueError(
f"Resource mapping already exists for {overwritten}, set overwrite_default_resource_mapping to True to overwrite"
)
self.extended_resource_mapping = {
**DEFAULT_RESOURCE_MAPPING,
**self.extended_resource_mapping,
}

def _validate_extended_resource_requests(
self, extended_resources: typing.Dict[str, int]
):
for k in extended_resources.keys():
if k not in self.extended_resource_mapping.keys():
raise ValueError(
f"extended resource '{k}' not found in extended_resource_mapping, available resources are {list(self.extended_resource_mapping.keys())}, to add more supported resources use extended_resource_mapping. i.e. extended_resource_mapping = {{'{k}': 'FOO_BAR'}}"
)

def _gpu_to_resource(self):
if self.head_gpus:
warnings.warn(
f"head_gpus is being deprecated, replacing with head_extended_resource_requests['nvidia.com/gpu'] = {self.head_gpus}"
)
if "nvidia.com/gpu" in self.head_extended_resource_requests:
raise ValueError(
"nvidia.com/gpu already exists in head_extended_resource_requests"
)
self.head_extended_resource_requests["nvidia.com/gpu"] = self.head_gpus
if self.num_gpus:
warnings.warn(
f"num_gpus is being deprecated, replacing with worker_extended_resource_requests['nvidia.com/gpu'] = {self.num_gpus}"
)
if "nvidia.com/gpu" in self.worker_extended_resource_requests:
raise ValueError(
"nvidia.com/gpu already exists in worker_extended_resource_requests"
)
self.worker_extended_resource_requests["nvidia.com/gpu"] = self.num_gpus

def _str_mem_no_unit_add_GB(self):
if isinstance(self.head_memory, str) and self.head_memory.isdecimal():
Expand All @@ -95,14 +190,6 @@ def _memory_to_string(self):
if isinstance(self.worker_memory_limits, int):
self.worker_memory_limits = f"{self.worker_memory_limits}G"

def _gpu_to_resource(self):
if self.head_gpus:
warnings.warn("head_gpus is being deprecated, use num_head_gpus")
self.num_head_gpus = self.head_gpus
if self.num_gpus:
warnings.warn("num_gpus is being deprecated, use num_worker_gpus")
self.num_worker_gpus = self.num_gpus

def _cpu_to_resource(self):
if self.min_cpus:
warnings.warn("min_cpus is being deprecated, use worker_cpu_requests")
Expand Down
7 changes: 4 additions & 3 deletions src/codeflare_sdk/cluster/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
dataclasses to store information for Ray clusters and AppWrappers.
"""

from dataclasses import dataclass
from dataclasses import dataclass, field
from enum import Enum
import typing


class RayClusterStatus(Enum):
Expand Down Expand Up @@ -74,14 +75,14 @@ class RayCluster:
status: RayClusterStatus
head_cpus: int
head_mem: str
head_gpu: int
workers: int
worker_mem_min: str
worker_mem_max: str
worker_cpu: int
worker_gpu: int
namespace: str
dashboard: str
worker_extended_resources: typing.Dict[str, int] = field(default_factory=dict)
head_extended_resources: typing.Dict[str, int] = field(default_factory=dict)


@dataclass
Expand Down
4 changes: 0 additions & 4 deletions src/codeflare_sdk/templates/base-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,9 @@ spec:
limits:
cpu: 2
memory: "8G"
nvidia.com/gpu: 0
requests:
cpu: 2
memory: "8G"
nvidia.com/gpu: 0
volumeMounts:
- mountPath: /etc/pki/tls/certs/odh-trusted-ca-bundle.crt
name: odh-trusted-ca-cert
Expand Down Expand Up @@ -163,11 +161,9 @@ spec:
limits:
cpu: "2"
memory: "12G"
nvidia.com/gpu: "1"
requests:
cpu: "2"
memory: "12G"
nvidia.com/gpu: "1"
volumeMounts:
- mountPath: /etc/pki/tls/certs/odh-trusted-ca-bundle.crt
name: odh-trusted-ca-cert
Expand Down
Loading

0 comments on commit 5ce0b2c

Please sign in to comment.