Skip to content

Commit

Permalink
Migrate from MCAD to AppWrapper v1beta2 (#521)
Browse files Browse the repository at this point in the history
* rename mcad to appwrapper

* remove dispatch_priority (not supported by v1beta2 AppWrapper)

* remove instascale

* remove priority/affinity from template -- not compatible with Kueue

* make mocked objects easier to maintain by removing unnecessary metadata

* port appwrapper status to v1beta2 names

* prune mocked appwrappers

* eliminate dependency on workload.codeflare.dev/appwrapper label

* Finish converting AppWrappers to v1beta2

* fix incomplete rebase

* rebase: remove instascale from new testcase

* add e2e test for appwrapper containing a raycluster

* Also must add local_queue label to appwrappers

* user labels should also be added to ray cluster wrapped in appwrapper

* fix more incorrect test cases that were assuming that appwrappers don't get a localqueue

* sdk_user must have rbacs to create appwrappers for e2e test to succeed

* elide AppWrappers from top-level documentation
  • Loading branch information
dgrove-oss committed Jun 5, 2024
1 parent 56b4478 commit c0f7d7f
Show file tree
Hide file tree
Showing 17 changed files with 1,166 additions and 2,112 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/e2e_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ jobs:
kubectl create clusterrolebinding sdk-user-namespace-creator --clusterrole=namespace-creator --user=sdk-user
kubectl create clusterrole raycluster-creator --verb=get,list,create,delete,patch --resource=rayclusters
kubectl create clusterrolebinding sdk-user-raycluster-creator --clusterrole=raycluster-creator --user=sdk-user
kubectl create clusterrole appwrapper-creator --verb=get,list,create,delete,patch --resource=appwrappers
kubectl create clusterrolebinding sdk-user-appwrapper-creator --clusterrole=appwrapper-creator --user=sdk-user
kubectl create clusterrole resourceflavor-creator --verb=get,list,create,delete --resource=resourceflavors
kubectl create clusterrolebinding sdk-user-resourceflavor-creator --clusterrole=resourceflavor-creator --user=sdk-user
kubectl create clusterrole clusterqueue-creator --verb=get,list,create,delete,patch --resource=clusterqueues
Expand Down
10 changes: 3 additions & 7 deletions docs/cluster-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,13 @@ cluster = Cluster(ClusterConfiguration(
max_cpus=1, # Default 1
min_memory=2, # Default 2
max_memory=2, # Default 2
mcad=True, # Default True
num_gpus=0, # Default 0
image="quay.io/project-codeflare/ray:latest-py39-cu118", # Mandatory Field
machine_types=["m5.xlarge", "g4dn.xlarge"],
labels={"exampleLabel": "example", "secondLabel": "example"},
))
```

Upon creating a cluster configuration with `mcad=True` an appwrapper will be created featuring the Ray Cluster and any Routes, Ingresses or Secrets that are needed to be created along side it.<br>
From there a user can call `cluster.up()` and `cluster.down()` to create and remove the appwrapper thus creating and removing the Ray Cluster.

In cases where `mcad=False` a yaml file will be created with the individual Ray Cluster, Route/Ingress and Secret included.<br>
The Ray Cluster and service will be created by KubeRay directly and the other components will be individually created.

The `labels={"exampleLabel": "example"}` parameter can be used to apply additional labels to the RayCluster resource.

After creating their`cluster`, a user can call `cluster.up()` and `cluster.down()` to respectively create or remove the Ray Cluster.
4 changes: 2 additions & 2 deletions src/codeflare_sdk/cluster/awload.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def submit(self) -> None:
api_instance = client.CustomObjectsApi(api_config_handler())
api_instance.create_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta1",
version="v1beta2",
namespace=self.namespace,
plural="appwrappers",
body=self.awyaml,
Expand All @@ -87,7 +87,7 @@ def remove(self) -> None:
api_instance = client.CustomObjectsApi(api_config_handler())
api_instance.delete_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta1",
version="v1beta2",
namespace=self.namespace,
plural="appwrappers",
name=self.name,
Expand Down
100 changes: 28 additions & 72 deletions src/codeflare_sdk/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,26 +103,6 @@ def job_client(self):
)
return self._job_submission_client

def evaluate_dispatch_priority(self):
priority_class = self.config.dispatch_priority

try:
config_check()
api_instance = client.CustomObjectsApi(api_config_handler())
priority_classes = api_instance.list_cluster_custom_object(
group="scheduling.k8s.io",
version="v1",
plural="priorityclasses",
)
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e)

for pc in priority_classes["items"]:
if pc["metadata"]["name"] == priority_class:
return pc["value"]
print(f"Priority class {priority_class} is not available in the cluster")
return None

def validate_image_config(self):
"""
Validates that the image configuration is not empty.
Expand Down Expand Up @@ -152,18 +132,6 @@ def create_app_wrapper(self):
self.validate_image_config()

# Before attempting to create the cluster AW, let's evaluate the ClusterConfig
if self.config.dispatch_priority:
if not self.config.mcad:
raise ValueError(
"Invalid Cluster Configuration, cannot have dispatch priority without MCAD"
)
priority_val = self.evaluate_dispatch_priority()
if priority_val == None:
raise ValueError(
"Invalid Cluster Configuration, AppWrapper not generated"
)
else:
priority_val = None

name = self.config.name
namespace = self.config.namespace
Expand All @@ -178,12 +146,10 @@ def create_app_wrapper(self):
workers = self.config.num_workers
template = self.config.template
image = self.config.image
instascale = self.config.instascale
mcad = self.config.mcad
appwrapper = self.config.appwrapper
instance_types = self.config.machine_types
env = self.config.envs
image_pull_secrets = self.config.image_pull_secrets
dispatch_priority = self.config.dispatch_priority
write_to_file = self.config.write_to_file
verify_tls = self.config.verify_tls
local_queue = self.config.local_queue
Expand All @@ -202,13 +168,10 @@ def create_app_wrapper(self):
workers=workers,
template=template,
image=image,
instascale=instascale,
mcad=mcad,
appwrapper=appwrapper,
instance_types=instance_types,
env=env,
image_pull_secrets=image_pull_secrets,
dispatch_priority=dispatch_priority,
priority_val=priority_val,
write_to_file=write_to_file,
verify_tls=verify_tls,
local_queue=local_queue,
Expand All @@ -230,13 +193,13 @@ def up(self):
try:
config_check()
api_instance = client.CustomObjectsApi(api_config_handler())
if self.config.mcad:
if self.config.appwrapper:
if self.config.write_to_file:
with open(self.app_wrapper_yaml) as f:
aw = yaml.load(f, Loader=yaml.FullLoader)
api_instance.create_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta1",
version="v1beta2",
namespace=namespace,
plural="appwrappers",
body=aw,
Expand All @@ -245,7 +208,7 @@ def up(self):
aw = yaml.safe_load(self.app_wrapper_yaml)
api_instance.create_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta1",
version="v1beta2",
namespace=namespace,
plural="appwrappers",
body=aw,
Expand Down Expand Up @@ -284,10 +247,10 @@ def down(self):
try:
config_check()
api_instance = client.CustomObjectsApi(api_config_handler())
if self.config.mcad:
if self.config.appwrapper:
api_instance.delete_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta1",
version="v1beta2",
namespace=namespace,
plural="appwrappers",
name=self.app_wrapper_name,
Expand All @@ -306,30 +269,28 @@ def status(
"""
ready = False
status = CodeFlareClusterStatus.UNKNOWN
if self.config.mcad:
if self.config.appwrapper:
# check the app wrapper status
appwrapper = _app_wrapper_status(self.config.name, self.config.namespace)
if appwrapper:
if appwrapper.status in [
AppWrapperStatus.RUNNING,
AppWrapperStatus.COMPLETED,
AppWrapperStatus.RUNNING_HOLD_COMPLETION,
AppWrapperStatus.RESUMING,
AppWrapperStatus.RESETTING,
]:
ready = False
status = CodeFlareClusterStatus.STARTING
elif appwrapper.status in [
AppWrapperStatus.FAILED,
AppWrapperStatus.DELETED,
]:
ready = False
status = CodeFlareClusterStatus.FAILED # should deleted be separate
return status, ready # exit early, no need to check ray status
elif appwrapper.status in [
AppWrapperStatus.PENDING,
AppWrapperStatus.QUEUEING,
AppWrapperStatus.SUSPENDED,
AppWrapperStatus.SUSPENDING,
]:
ready = False
if appwrapper.status == AppWrapperStatus.PENDING:
if appwrapper.status == AppWrapperStatus.SUSPENDED:
status = CodeFlareClusterStatus.QUEUED
else:
status = CodeFlareClusterStatus.QUEUEING
Expand Down Expand Up @@ -501,7 +462,7 @@ def job_logs(self, job_id: str) -> str:

def from_k8_cluster_object(
rc,
mcad=True,
appwrapper=True,
write_to_file=False,
verify_tls=True,
):
Expand Down Expand Up @@ -534,11 +495,10 @@ def from_k8_cluster_object(
"resources"
]["limits"]["nvidia.com/gpu"]
),
instascale=True if machine_types else False,
image=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][
0
]["image"],
mcad=mcad,
appwrapper=appwrapper,
write_to_file=write_to_file,
verify_tls=verify_tls,
local_queue=rc["metadata"]
Expand Down Expand Up @@ -597,15 +557,15 @@ def list_all_clusters(namespace: str, print_to_console: bool = True):
return clusters


def list_all_queued(namespace: str, print_to_console: bool = True, mcad: bool = False):
def list_all_queued(
namespace: str, print_to_console: bool = True, appwrapper: bool = False
):
"""
Returns (and prints by default) a list of all currently queued-up Ray Clusters
in a given namespace.
"""
if mcad:
resources = _get_app_wrappers(
namespace, filter=[AppWrapperStatus.RUNNING, AppWrapperStatus.PENDING]
)
if appwrapper:
resources = _get_app_wrappers(namespace, filter=[AppWrapperStatus.SUSPENDED])
if print_to_console:
pretty_print.print_app_wrappers_status(resources)
else:
Expand Down Expand Up @@ -675,10 +635,10 @@ def get_cluster(

for rc in rcs["items"]:
if rc["metadata"]["name"] == cluster_name:
mcad = _check_aw_exists(cluster_name, namespace)
appwrapper = _check_aw_exists(cluster_name, namespace)
return Cluster.from_k8_cluster_object(
rc,
mcad=mcad,
appwrapper=appwrapper,
write_to_file=write_to_file,
verify_tls=verify_tls,
)
Expand Down Expand Up @@ -721,7 +681,7 @@ def _check_aw_exists(name: str, namespace: str) -> bool:
api_instance = client.CustomObjectsApi(api_config_handler())
aws = api_instance.list_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta1",
version="v1beta2",
namespace=namespace,
plural="appwrappers",
)
Expand Down Expand Up @@ -781,7 +741,7 @@ def _app_wrapper_status(name, namespace="default") -> Optional[AppWrapper]:
api_instance = client.CustomObjectsApi(api_config_handler())
aws = api_instance.list_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta1",
version="v1beta2",
namespace=namespace,
plural="appwrappers",
)
Expand Down Expand Up @@ -851,7 +811,7 @@ def _get_app_wrappers(
api_instance = client.CustomObjectsApi(api_config_handler())
aws = api_instance.list_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta1",
version="v1beta2",
namespace=namespace,
plural="appwrappers",
)
Expand Down Expand Up @@ -945,18 +905,14 @@ def _map_to_ray_cluster(rc) -> Optional[RayCluster]:


def _map_to_app_wrapper(aw) -> AppWrapper:
if "status" in aw and "canrun" in aw["status"]:
if "status" in aw:
return AppWrapper(
name=aw["metadata"]["name"],
status=AppWrapperStatus(aw["status"]["state"].lower()),
can_run=aw["status"]["canrun"],
job_state=aw["status"]["queuejobstate"],
status=AppWrapperStatus(aw["status"]["phase"].lower()),
)
return AppWrapper(
name=aw["metadata"]["name"],
status=AppWrapperStatus("queueing"),
can_run=False,
job_state="Still adding to queue",
status=AppWrapperStatus("suspended"),
)


Expand Down
4 changes: 1 addition & 3 deletions src/codeflare_sdk/cluster/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,10 @@ class ClusterConfiguration:
max_memory: typing.Union[int, str] = 2
num_gpus: int = 0
template: str = f"{dir}/templates/base-template.yaml"
instascale: bool = False
mcad: bool = False
appwrapper: bool = False
envs: dict = field(default_factory=dict)
image: str = ""
image_pull_secrets: list = field(default_factory=list)
dispatch_priority: str = None
write_to_file: bool = False
verify_tls: bool = True
labels: dict = field(default_factory=dict)
Expand Down
15 changes: 7 additions & 8 deletions src/codeflare_sdk/cluster/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,17 @@ class RayClusterStatus(Enum):

class AppWrapperStatus(Enum):
"""
Defines the possible reportable states of an AppWrapper.
Defines the possible reportable phases of an AppWrapper.
"""

QUEUEING = "queueing"
PENDING = "pending"
SUSPENDED = "suspended"
RESUMING = "resuming"
RUNNING = "running"
RESETTING = "resetting"
SUSPENDING = "suspending"
SUCCEEDED = "succeeded"
FAILED = "failed"
DELETED = "deleted"
COMPLETED = "completed"
RUNNING_HOLD_COMPLETION = "runningholdcompletion"
TERMINATING = "terminating"


class CodeFlareClusterStatus(Enum):
Expand Down Expand Up @@ -91,5 +92,3 @@ class AppWrapper:

name: str
status: AppWrapperStatus
can_run: bool
job_state: str
Loading

0 comments on commit c0f7d7f

Please sign in to comment.