diff --git a/docs/detailed-documentation/cluster/awload.html b/docs/detailed-documentation/cluster/awload.html index 57b407e80..297ebc02c 100644 --- a/docs/detailed-documentation/cluster/awload.html +++ b/docs/detailed-documentation/cluster/awload.html @@ -93,7 +93,7 @@
codeflare_sdk.cluster.awload
codeflare_sdk.cluster.awload
codeflare_sdk.cluster.cluster
codeflare_sdk.cluster.cluster
codeflare_sdk.cluster.cluster
codeflare_sdk.cluster.cluster
codeflare_sdk.cluster.cluster
codeflare_sdk.cluster.cluster
codeflare_sdk.cluster.cluster
codeflare_sdk.cluster.cluster
codeflare_sdk.cluster.cluster
codeflare_sdk.cluster.cluster
codeflare_sdk.cluster.cluster
codeflare_sdk.cluster.cluster
codeflare_sdk.cluster.cluster
codeflare_sdk.cluster.cluster
codeflare_sdk.cluster.cluster
codeflare_sdk.cluster.cluster
-def list_all_queued(namespace: str, print_to_console: bool = True, mcad: bool = False)
+def list_all_queued(namespace: str, print_to_console: bool = True, appwrapper: bool = False)
Returns (and prints by default) a list of all currently queued-up Ray Clusters @@ -1136,15 +1092,15 @@
def list_all_queued(namespace: str, print_to_console: bool = True, mcad: bool = False):
+def list_all_queued(
+ namespace: str, print_to_console: bool = True, appwrapper: bool = False
+):
"""
Returns (and prints by default) a list of all currently queued-up Ray Clusters
in a given namespace.
"""
- if mcad:
- resources = _get_app_wrappers(
- namespace, filter=[AppWrapperStatus.RUNNING, AppWrapperStatus.PENDING]
- )
+ if appwrapper:
+ resources = _get_app_wrappers(namespace, filter=[AppWrapperStatus.SUSPENDED])
if print_to_console:
pretty_print.print_app_wrappers_status(resources)
else:
@@ -1229,26 +1185,6 @@ Classes
)
return self._job_submission_client
- def evaluate_dispatch_priority(self):
- priority_class = self.config.dispatch_priority
-
- try:
- config_check()
- api_instance = client.CustomObjectsApi(api_config_handler())
- priority_classes = api_instance.list_cluster_custom_object(
- group="scheduling.k8s.io",
- version="v1",
- plural="priorityclasses",
- )
- except Exception as e: # pragma: no cover
- return _kube_api_error_handling(e)
-
- for pc in priority_classes["items"]:
- if pc["metadata"]["name"] == priority_class:
- return pc["value"]
- print(f"Priority class {priority_class} is not available in the cluster")
- return None
-
def validate_image_config(self):
"""
Validates that the image configuration is not empty.
@@ -1278,18 +1214,6 @@ Classes
self.validate_image_config()
# Before attempting to create the cluster AW, let's evaluate the ClusterConfig
- if self.config.dispatch_priority:
- if not self.config.mcad:
- raise ValueError(
- "Invalid Cluster Configuration, cannot have dispatch priority without MCAD"
- )
- priority_val = self.evaluate_dispatch_priority()
- if priority_val == None:
- raise ValueError(
- "Invalid Cluster Configuration, AppWrapper not generated"
- )
- else:
- priority_val = None
name = self.config.name
namespace = self.config.namespace
@@ -1304,12 +1228,10 @@ Classes
workers = self.config.num_workers
template = self.config.template
image = self.config.image
- instascale = self.config.instascale
- mcad = self.config.mcad
+ appwrapper = self.config.appwrapper
instance_types = self.config.machine_types
env = self.config.envs
image_pull_secrets = self.config.image_pull_secrets
- dispatch_priority = self.config.dispatch_priority
write_to_file = self.config.write_to_file
verify_tls = self.config.verify_tls
local_queue = self.config.local_queue
@@ -1328,13 +1250,10 @@ Classes
workers=workers,
template=template,
image=image,
- instascale=instascale,
- mcad=mcad,
+ appwrapper=appwrapper,
instance_types=instance_types,
env=env,
image_pull_secrets=image_pull_secrets,
- dispatch_priority=dispatch_priority,
- priority_val=priority_val,
write_to_file=write_to_file,
verify_tls=verify_tls,
local_queue=local_queue,
@@ -1356,13 +1275,13 @@ Classes
try:
config_check()
api_instance = client.CustomObjectsApi(api_config_handler())
- if self.config.mcad:
+ if self.config.appwrapper:
if self.config.write_to_file:
with open(self.app_wrapper_yaml) as f:
aw = yaml.load(f, Loader=yaml.FullLoader)
api_instance.create_namespaced_custom_object(
group="workload.codeflare.dev",
- version="v1beta1",
+ version="v1beta2",
namespace=namespace,
plural="appwrappers",
body=aw,
@@ -1371,7 +1290,7 @@ Classes
aw = yaml.safe_load(self.app_wrapper_yaml)
api_instance.create_namespaced_custom_object(
group="workload.codeflare.dev",
- version="v1beta1",
+ version="v1beta2",
namespace=namespace,
plural="appwrappers",
body=aw,
@@ -1410,10 +1329,10 @@ Classes
try:
config_check()
api_instance = client.CustomObjectsApi(api_config_handler())
- if self.config.mcad:
+ if self.config.appwrapper:
api_instance.delete_namespaced_custom_object(
group="workload.codeflare.dev",
- version="v1beta1",
+ version="v1beta2",
namespace=namespace,
plural="appwrappers",
name=self.app_wrapper_name,
@@ -1432,30 +1351,28 @@ Classes
"""
ready = False
status = CodeFlareClusterStatus.UNKNOWN
- if self.config.mcad:
+ if self.config.appwrapper:
# check the app wrapper status
appwrapper = _app_wrapper_status(self.config.name, self.config.namespace)
if appwrapper:
if appwrapper.status in [
- AppWrapperStatus.RUNNING,
- AppWrapperStatus.COMPLETED,
- AppWrapperStatus.RUNNING_HOLD_COMPLETION,
+ AppWrapperStatus.RESUMING,
+ AppWrapperStatus.RESETTING,
]:
ready = False
status = CodeFlareClusterStatus.STARTING
elif appwrapper.status in [
AppWrapperStatus.FAILED,
- AppWrapperStatus.DELETED,
]:
ready = False
status = CodeFlareClusterStatus.FAILED # should deleted be separate
return status, ready # exit early, no need to check ray status
elif appwrapper.status in [
- AppWrapperStatus.PENDING,
- AppWrapperStatus.QUEUEING,
+ AppWrapperStatus.SUSPENDED,
+ AppWrapperStatus.SUSPENDING,
]:
ready = False
- if appwrapper.status == AppWrapperStatus.PENDING:
+ if appwrapper.status == AppWrapperStatus.SUSPENDED:
status = CodeFlareClusterStatus.QUEUED
else:
status = CodeFlareClusterStatus.QUEUEING
@@ -1627,7 +1544,7 @@ Classes
def from_k8_cluster_object(
rc,
- mcad=True,
+ appwrapper=True,
write_to_file=False,
verify_tls=True,
):
@@ -1660,11 +1577,10 @@ Classes
"resources"
]["limits"]["nvidia.com/gpu"]
),
- instascale=True if machine_types else False,
image=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][
0
]["image"],
- mcad=mcad,
+ appwrapper=appwrapper,
write_to_file=write_to_file,
verify_tls=verify_tls,
local_queue=rc["metadata"]
@@ -1845,18 +1761,6 @@ Methods
self.validate_image_config()
# Before attempting to create the cluster AW, let's evaluate the ClusterConfig
- if self.config.dispatch_priority:
- if not self.config.mcad:
- raise ValueError(
- "Invalid Cluster Configuration, cannot have dispatch priority without MCAD"
- )
- priority_val = self.evaluate_dispatch_priority()
- if priority_val == None:
- raise ValueError(
- "Invalid Cluster Configuration, AppWrapper not generated"
- )
- else:
- priority_val = None
name = self.config.name
namespace = self.config.namespace
@@ -1871,12 +1775,10 @@ Methods
workers = self.config.num_workers
template = self.config.template
image = self.config.image
- instascale = self.config.instascale
- mcad = self.config.mcad
+ appwrapper = self.config.appwrapper
instance_types = self.config.machine_types
env = self.config.envs
image_pull_secrets = self.config.image_pull_secrets
- dispatch_priority = self.config.dispatch_priority
write_to_file = self.config.write_to_file
verify_tls = self.config.verify_tls
local_queue = self.config.local_queue
@@ -1895,13 +1797,10 @@ Methods
workers=workers,
template=template,
image=image,
- instascale=instascale,
- mcad=mcad,
+ appwrapper=appwrapper,
instance_types=instance_types,
env=env,
image_pull_secrets=image_pull_secrets,
- dispatch_priority=dispatch_priority,
- priority_val=priority_val,
write_to_file=write_to_file,
verify_tls=verify_tls,
local_queue=local_queue,
@@ -1945,10 +1844,10 @@ Methods
try:
config_check()
api_instance = client.CustomObjectsApi(api_config_handler())
- if self.config.mcad:
+ if self.config.appwrapper:
api_instance.delete_namespaced_custom_object(
group="workload.codeflare.dev",
- version="v1beta1",
+ version="v1beta2",
namespace=namespace,
plural="appwrappers",
name=self.app_wrapper_name,
@@ -1959,38 +1858,8 @@ Methods
return _kube_api_error_handling(e)
-
-def evaluate_dispatch_priority(self)
-
-
-
-
-
-Expand source code
-
-def evaluate_dispatch_priority(self):
- priority_class = self.config.dispatch_priority
-
- try:
- config_check()
- api_instance = client.CustomObjectsApi(api_config_handler())
- priority_classes = api_instance.list_cluster_custom_object(
- group="scheduling.k8s.io",
- version="v1",
- plural="priorityclasses",
- )
- except Exception as e: # pragma: no cover
- return _kube_api_error_handling(e)
-
- for pc in priority_classes["items"]:
- if pc["metadata"]["name"] == priority_class:
- return pc["value"]
- print(f"Priority class {priority_class} is not available in the cluster")
- return None
-
-
-def from_k8_cluster_object(rc, mcad=True, write_to_file=False, verify_tls=True)
+def from_k8_cluster_object(rc, appwrapper=True, write_to_file=False, verify_tls=True)
@@ -2000,7 +1869,7 @@ Methods
def from_k8_cluster_object(
rc,
- mcad=True,
+ appwrapper=True,
write_to_file=False,
verify_tls=True,
):
@@ -2033,11 +1902,10 @@ Methods
"resources"
]["limits"]["nvidia.com/gpu"]
),
- instascale=True if machine_types else False,
image=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][
0
]["image"],
- mcad=mcad,
+ appwrapper=appwrapper,
write_to_file=write_to_file,
verify_tls=verify_tls,
local_queue=rc["metadata"]
@@ -2154,30 +2022,28 @@ Methods
"""
ready = False
status = CodeFlareClusterStatus.UNKNOWN
- if self.config.mcad:
+ if self.config.appwrapper:
# check the app wrapper status
appwrapper = _app_wrapper_status(self.config.name, self.config.namespace)
if appwrapper:
if appwrapper.status in [
- AppWrapperStatus.RUNNING,
- AppWrapperStatus.COMPLETED,
- AppWrapperStatus.RUNNING_HOLD_COMPLETION,
+ AppWrapperStatus.RESUMING,
+ AppWrapperStatus.RESETTING,
]:
ready = False
status = CodeFlareClusterStatus.STARTING
elif appwrapper.status in [
AppWrapperStatus.FAILED,
- AppWrapperStatus.DELETED,
]:
ready = False
status = CodeFlareClusterStatus.FAILED # should deleted be separate
return status, ready # exit early, no need to check ray status
elif appwrapper.status in [
- AppWrapperStatus.PENDING,
- AppWrapperStatus.QUEUEING,
+ AppWrapperStatus.SUSPENDED,
+ AppWrapperStatus.SUSPENDING,
]:
ready = False
- if appwrapper.status == AppWrapperStatus.PENDING:
+ if appwrapper.status == AppWrapperStatus.SUSPENDED:
status = CodeFlareClusterStatus.QUEUED
else:
status = CodeFlareClusterStatus.QUEUEING
@@ -2244,13 +2110,13 @@ Methods
try:
config_check()
api_instance = client.CustomObjectsApi(api_config_handler())
- if self.config.mcad:
+ if self.config.appwrapper:
if self.config.write_to_file:
with open(self.app_wrapper_yaml) as f:
aw = yaml.load(f, Loader=yaml.FullLoader)
api_instance.create_namespaced_custom_object(
group="workload.codeflare.dev",
- version="v1beta1",
+ version="v1beta2",
namespace=namespace,
plural="appwrappers",
body=aw,
@@ -2259,7 +2125,7 @@ Methods
aw = yaml.safe_load(self.app_wrapper_yaml)
api_instance.create_namespaced_custom_object(
group="workload.codeflare.dev",
- version="v1beta1",
+ version="v1beta2",
namespace=namespace,
plural="appwrappers",
body=aw,
@@ -2371,7 +2237,6 @@ create_app_wrapper
details
down
-evaluate_dispatch_priority
from_k8_cluster_object
is_dashboard_ready
job_client
diff --git a/docs/detailed-documentation/cluster/config.html b/docs/detailed-documentation/cluster/config.html
index 1879afaf0..4ce9dcdd0 100644
--- a/docs/detailed-documentation/cluster/config.html
+++ b/docs/detailed-documentation/cluster/config.html
@@ -78,12 +78,10 @@ Module codeflare_sdk.cluster.config
max_memory: typing.Union[int, str] = 2
num_gpus: int = 0
template: str = f"{dir}/templates/base-template.yaml"
- instascale: bool = False
- mcad: bool = False
+ appwrapper: bool = False
envs: dict = field(default_factory=dict)
image: str = ""
image_pull_secrets: list = field(default_factory=list)
- dispatch_priority: str = None
write_to_file: bool = False
verify_tls: bool = True
labels: dict = field(default_factory=dict)
@@ -126,7 +124,7 @@ Classes
class ClusterConfiguration
-(name: str, namespace: str = None, head_info: list = <factory>, head_cpus: Union[int, str] = 2, head_memory: Union[int, str] = 8, head_gpus: int = 0, machine_types: list = <factory>, min_cpus: Union[int, str] = 1, max_cpus: Union[int, str] = 1, num_workers: int = 1, min_memory: Union[int, str] = 2, max_memory: Union[int, str] = 2, num_gpus: int = 0, template: str = '/home/runner/work/codeflare-sdk/codeflare-sdk/src/codeflare_sdk/templates/base-template.yaml', instascale: bool = False, mcad: bool = False, envs: dict = <factory>, image: str = '', image_pull_secrets: list = <factory>, dispatch_priority: str = None, write_to_file: bool = False, verify_tls: bool = True, labels: dict = <factory>, local_queue: str = None)
+(name: str, namespace: str = None, head_info: list = <factory>, head_cpus: Union[int, str] = 2, head_memory: Union[int, str] = 8, head_gpus: int = 0, machine_types: list = <factory>, min_cpus: Union[int, str] = 1, max_cpus: Union[int, str] = 1, num_workers: int = 1, min_memory: Union[int, str] = 2, max_memory: Union[int, str] = 2, num_gpus: int = 0, template: str = '/home/runner/work/codeflare-sdk/codeflare-sdk/src/codeflare_sdk/templates/base-template.yaml', appwrapper: bool = False, envs: dict = <factory>, image: str = '', image_pull_secrets: list = <factory>, write_to_file: bool = False, verify_tls: bool = True, labels: dict = <factory>, local_queue: str = None)
-
This dataclass is used to specify resource requirements and other details, and
@@ -155,12 +153,10 @@
Classes
max_memory: typing.Union[int, str] = 2
num_gpus: int = 0
template: str = f"{dir}/templates/base-template.yaml"
- instascale: bool = False
- mcad: bool = False
+ appwrapper: bool = False
envs: dict = field(default_factory=dict)
image: str = ""
image_pull_secrets: list = field(default_factory=list)
- dispatch_priority: str = None
write_to_file: bool = False
verify_tls: bool = True
labels: dict = field(default_factory=dict)
@@ -193,7 +189,7 @@ Classes
Class variables
-var dispatch_priority : str
+var appwrapper : bool
-
@@ -225,10 +221,6 @@ Class variables
-
-var instascale : bool
--
-
-
var labels : dict
-
@@ -249,10 +241,6 @@
Class variables
-
- var mcad : bool
--
-
-
var min_cpus : Union[int, str]
-
@@ -310,7 +298,7 @@
Index
-
ClusterConfiguration
-dispatch_priority
+appwrapper
envs
head_cpus
head_gpus
@@ -318,13 +306,11 @@ head_memory
image
image_pull_secrets
-instascale
labels
local_queue
machine_types
max_cpus
max_memory
-mcad
min_cpus
min_memory
name
diff --git a/docs/detailed-documentation/cluster/model.html b/docs/detailed-documentation/cluster/model.html
index 3843f545e..d69357e18 100644
--- a/docs/detailed-documentation/cluster/model.html
+++ b/docs/detailed-documentation/cluster/model.html
@@ -70,16 +70,17 @@ Module codeflare_sdk.cluster.model
class AppWrapperStatus(Enum):
"""
- Defines the possible reportable states of an AppWrapper.
+ Defines the possible reportable phases of an AppWrapper.
"""
- QUEUEING = "queueing"
- PENDING = "pending"
+ SUSPENDED = "suspended"
+ RESUMING = "resuming"
RUNNING = "running"
+ RESETTING = "resetting"
+ SUSPENDING = "suspending"
+ SUCCEEDED = "succeeded"
FAILED = "failed"
- DELETED = "deleted"
- COMPLETED = "completed"
- RUNNING_HOLD_COMPLETION = "runningholdcompletion"
+ TERMINATING = "terminating"
class CodeFlareClusterStatus(Enum):
@@ -123,9 +124,7 @@ Module codeflare_sdk.cluster.model
"""
name: str
- status: AppWrapperStatus
- can_run: bool
- job_state: str
+ status: AppWrapperStatus
@@ -139,7 +138,7 @@ Classes
class AppWrapper
-(name: str, status: AppWrapperStatus, can_run: bool, job_state: str)
+(name: str, status: AppWrapperStatus)
-
For storing information about an AppWrapper.
@@ -153,20 +152,10 @@ Classes
"""
name: str
- status: AppWrapperStatus
- can_run: bool
- job_state: str
+ status: AppWrapperStatus
Class variables
-var can_run : bool
--
-
-
-var job_state : str
--
-
-
var name : str
-
@@ -182,23 +171,24 @@
Class variables
(value, names=None, *, module=None, qualname=None, type=None, start=1)
-
-
Defines the possible reportable states of an AppWrapper.
+Defines the possible reportable phases of an AppWrapper.
Expand source code
class AppWrapperStatus(Enum):
"""
- Defines the possible reportable states of an AppWrapper.
+ Defines the possible reportable phases of an AppWrapper.
"""
- QUEUEING = "queueing"
- PENDING = "pending"
+ SUSPENDED = "suspended"
+ RESUMING = "resuming"
RUNNING = "running"
+ RESETTING = "resetting"
+ SUSPENDING = "suspending"
+ SUCCEEDED = "succeeded"
FAILED = "failed"
- DELETED = "deleted"
- COMPLETED = "completed"
- RUNNING_HOLD_COMPLETION = "runningholdcompletion"
+ TERMINATING = "terminating"
Ancestors
@@ -206,31 +196,35 @@ Ancestors
Class variables
-var COMPLETED
+var FAILED
-
-var DELETED
+var RESETTING
-
-var FAILED
+var RESUMING
-
-var PENDING
+var RUNNING
-
-var QUEUEING
+var SUCCEEDED
-
-var RUNNING
+var SUSPENDED
+-
+
+
+var SUSPENDING
-
-var RUNNING_HOLD_COMPLETION
+var TERMINATING
-
@@ -444,22 +438,21 @@ Index
-
AppWrapper
-
AppWrapperStatus
-
-
diff --git a/docs/detailed-documentation/utils/generate_yaml.html b/docs/detailed-documentation/utils/generate_yaml.html
index b2a3af84e..5ee410ce9 100644
--- a/docs/detailed-documentation/utils/generate_yaml.html
+++ b/docs/detailed-documentation/utils/generate_yaml.html
@@ -112,109 +112,11 @@
Module codeflare_sdk.utils.generate_yaml
metadata = yaml.get("metadata")
metadata["name"] = appwrapper_name
metadata["namespace"] = namespace
- lower_meta = item.get("generictemplate", {}).get("metadata")
- lower_meta["labels"]["workload.codeflare.dev/appwrapper"] = appwrapper_name
+ lower_meta = item.get("template", {}).get("metadata")
lower_meta["name"] = cluster_name
lower_meta["namespace"] = namespace
-def update_labels(yaml, instascale, instance_types):
- metadata = yaml.get("metadata")
- if instascale:
- if not len(instance_types) > 0:
- sys.exit(
- "If instascale is set to true, must provide at least one instance type"
- )
- type_str = ""
- for type in instance_types:
- type_str += type + "_"
- type_str = type_str[:-1]
- metadata["labels"]["orderedinstance"] = type_str
- else:
- metadata.pop("labels")
-
-
-def update_priority(yaml, item, dispatch_priority, priority_val):
- spec = yaml.get("spec")
- if dispatch_priority is not None:
- if priority_val:
- spec["priority"] = priority_val
- else:
- raise ValueError(
- "AW generation error: Priority value is None, while dispatch_priority is defined"
- )
- head = item.get("generictemplate").get("spec").get("headGroupSpec")
- worker = item.get("generictemplate").get("spec").get("workerGroupSpecs")[0]
- head["template"]["spec"]["priorityClassName"] = dispatch_priority
- worker["template"]["spec"]["priorityClassName"] = dispatch_priority
- else:
- spec.pop("priority")
-
-
-def update_custompodresources(
- item,
- min_cpu,
- max_cpu,
- min_memory,
- max_memory,
- gpu,
- workers,
- head_cpus,
- head_memory,
- head_gpus,
-):
- if "custompodresources" in item.keys():
- custompodresources = item.get("custompodresources")
- for i in range(len(custompodresources)):
- resource = custompodresources[i]
- if i == 0:
- # Leave head node resources as template default
- resource["requests"]["cpu"] = head_cpus
- resource["limits"]["cpu"] = head_cpus
- resource["requests"]["memory"] = head_memory
- resource["limits"]["memory"] = head_memory
- resource["requests"]["nvidia.com/gpu"] = head_gpus
- resource["limits"]["nvidia.com/gpu"] = head_gpus
-
- else:
- for k, v in resource.items():
- if k == "replicas" and i == 1:
- resource[k] = workers
- if k == "requests" or k == "limits":
- for spec, _ in v.items():
- if spec == "cpu":
- if k == "limits":
- resource[k][spec] = max_cpu
- else:
- resource[k][spec] = min_cpu
- if spec == "memory":
- if k == "limits":
- resource[k][spec] = max_memory
- else:
- resource[k][spec] = min_memory
- if spec == "nvidia.com/gpu":
- if i == 0:
- resource[k][spec] = 0
- else:
- resource[k][spec] = gpu
- else:
- sys.exit("Error: malformed template")
-
-
-def update_affinity(spec, appwrapper_name, instascale):
- if instascale:
- node_selector_terms = (
- spec.get("affinity")
- .get("nodeAffinity")
- .get("requiredDuringSchedulingIgnoredDuringExecution")
- .get("nodeSelectorTerms")
- )
- node_selector_terms[0]["matchExpressions"][0]["values"][0] = appwrapper_name
- node_selector_terms[0]["matchExpressions"][0]["key"] = appwrapper_name
- else:
- spec.pop("affinity")
-
-
def update_image(spec, image):
containers = spec.get("containers")
for container in containers:
@@ -263,18 +165,17 @@ Module codeflare_sdk.utils.generate_yaml
gpu,
workers,
image,
- instascale,
env,
image_pull_secrets,
head_cpus,
head_memory,
head_gpus,
):
- if "generictemplate" in item.keys():
- head = item.get("generictemplate").get("spec").get("headGroupSpec")
+ if "template" in item.keys():
+ head = item.get("template").get("spec").get("headGroupSpec")
head["rayStartParams"]["num-gpus"] = str(int(head_gpus))
- worker = item.get("generictemplate").get("spec").get("workerGroupSpecs")[0]
+ worker = item.get("template").get("spec").get("workerGroupSpecs")[0]
# Head counts as first worker
worker["replicas"] = workers
worker["minReplicas"] = workers
@@ -284,7 +185,6 @@ Module codeflare_sdk.utils.generate_yaml
for comp in [head, worker]:
spec = comp.get("template").get("spec")
- update_affinity(spec, appwrapper_name, instascale)
update_image_pull_secrets(spec, image_pull_secrets)
update_image(spec, image)
update_env(spec, env)
@@ -359,74 +259,52 @@ Module codeflare_sdk.utils.generate_yaml
return False
+def add_queue_label(item: dict, namespace: str, local_queue: Optional[str]):
+ lq_name = local_queue or get_default_kueue_name(namespace)
+ if not local_queue_exists(namespace, lq_name):
+ raise ValueError(
+ "local_queue provided does not exist or is not in this namespace. Please provide the correct local_queue name in Cluster Configuration"
+ )
+ if not "labels" in item["metadata"]:
+ item["metadata"]["labels"] = {}
+ item["metadata"]["labels"].update({"kueue.x-k8s.io/queue-name": lq_name})
+
+
+def augment_labels(item: dict, labels: dict):
+ if "template" in item:
+ if not "labels" in item["template"]["metadata"]:
+ item["template"]["metadata"]["labels"] = {}
+ item["template"]["metadata"]["labels"].update(labels)
+
+
def write_components(
user_yaml: dict,
output_file_name: str,
- namespace: str,
- local_queue: Optional[str],
- labels: dict,
):
# Create the directory if it doesn't exist
directory_path = os.path.dirname(output_file_name)
if not os.path.exists(directory_path):
os.makedirs(directory_path)
- components = user_yaml.get("spec", "resources")["resources"].get("GenericItems")
+ components = user_yaml.get("spec", "resources").get("components")
open(output_file_name, "w").close()
- lq_name = local_queue or get_default_kueue_name(namespace)
- cluster_labels = labels
- if not local_queue_exists(namespace, lq_name):
- raise ValueError(
- "local_queue provided does not exist or is not in this namespace. Please provide the correct local_queue name in Cluster Configuration"
- )
with open(output_file_name, "a") as outfile:
for component in components:
- if "generictemplate" in component:
- if (
- "workload.codeflare.dev/appwrapper"
- in component["generictemplate"]["metadata"]["labels"]
- ):
- del component["generictemplate"]["metadata"]["labels"][
- "workload.codeflare.dev/appwrapper"
- ]
- labels = component["generictemplate"]["metadata"]["labels"]
- labels.update({"kueue.x-k8s.io/queue-name": lq_name})
- labels.update(cluster_labels)
+ if "template" in component:
outfile.write("---\n")
- yaml.dump(
- component["generictemplate"], outfile, default_flow_style=False
- )
+ yaml.dump(component["template"], outfile, default_flow_style=False)
print(f"Written to: {output_file_name}")
def load_components(
user_yaml: dict,
name: str,
- namespace: str,
- local_queue: Optional[str],
- labels: dict,
):
component_list = []
- components = user_yaml.get("spec", "resources")["resources"].get("GenericItems")
- lq_name = local_queue or get_default_kueue_name(namespace)
- cluster_labels = labels
- if not local_queue_exists(namespace, lq_name):
- raise ValueError(
- "local_queue provided does not exist or is not in this namespace. Please provide the correct local_queue name in Cluster Configuration"
- )
+ components = user_yaml.get("spec", "resources").get("components")
for component in components:
- if "generictemplate" in component:
- if (
- "workload.codeflare.dev/appwrapper"
- in component["generictemplate"]["metadata"]["labels"]
- ):
- del component["generictemplate"]["metadata"]["labels"][
- "workload.codeflare.dev/appwrapper"
- ]
- labels = component["generictemplate"]["metadata"]["labels"]
- labels.update({"kueue.x-k8s.io/queue-name": lq_name})
- labels.update(cluster_labels)
- component_list.append(component["generictemplate"])
+ if "template" in component:
+ component_list.append(component["template"])
resources = "---\n" + "---\n".join(
[yaml.dump(component) for component in component_list]
@@ -456,13 +334,10 @@ Module codeflare_sdk.utils.generate_yaml
workers: int,
template: str,
image: str,
- instascale: bool,
- mcad: bool,
+ appwrapper: bool,
instance_types: list,
env,
image_pull_secrets: list,
- dispatch_priority: str,
- priority_val: int,
write_to_file: bool,
verify_tls: bool,
local_queue: Optional[str],
@@ -471,7 +346,7 @@ Module codeflare_sdk.utils.generate_yaml
user_yaml = read_template(template)
appwrapper_name, cluster_name = gen_names(name)
resources = user_yaml.get("spec", "resources")
- item = resources["resources"].get("GenericItems")[0]
+ item = resources.get("components")[0]
update_names(
user_yaml,
item,
@@ -479,20 +354,6 @@ Module codeflare_sdk.utils.generate_yaml
cluster_name,
namespace,
)
- update_labels(user_yaml, instascale, instance_types)
- update_priority(user_yaml, item, dispatch_priority, priority_val)
- update_custompodresources(
- item,
- min_cpu,
- max_cpu,
- min_memory,
- max_memory,
- gpu,
- workers,
- head_cpus,
- head_memory,
- head_gpus,
- )
update_nodes(
item,
appwrapper_name,
@@ -503,7 +364,6 @@ Module codeflare_sdk.utils.generate_yaml
gpu,
workers,
image,
- instascale,
env,
image_pull_secrets,
head_cpus,
@@ -511,20 +371,27 @@ Module codeflare_sdk.utils.generate_yaml
head_gpus,
)
+ augment_labels(item, labels)
+
+ if appwrapper:
+ add_queue_label(user_yaml, namespace, local_queue)
+ else:
+ add_queue_label(item["template"], namespace, local_queue)
+
directory_path = os.path.expanduser("~/.codeflare/resources/")
outfile = os.path.join(directory_path, appwrapper_name + ".yaml")
if write_to_file:
- if mcad:
+ if appwrapper:
write_user_appwrapper(user_yaml, outfile)
else:
- write_components(user_yaml, outfile, namespace, local_queue, labels)
+ write_components(user_yaml, outfile)
return outfile
else:
- if mcad:
+ if appwrapper:
user_yaml = load_appwrapper(user_yaml, name)
else:
- user_yaml = load_components(user_yaml, name, namespace, local_queue, labels)
+ user_yaml = load_components(user_yaml, name)
return user_yaml
@@ -535,6 +402,42 @@ Module codeflare_sdk.utils.generate_yaml
Functions
+
+def add_queue_label(item: dict, namespace: str, local_queue: Optional[str])
+
+-
+
+
+
+Expand source code
+
+def add_queue_label(item: dict, namespace: str, local_queue: Optional[str]):
+ lq_name = local_queue or get_default_kueue_name(namespace)
+ if not local_queue_exists(namespace, lq_name):
+ raise ValueError(
+ "local_queue provided does not exist or is not in this namespace. Please provide the correct local_queue name in Cluster Configuration"
+ )
+ if not "labels" in item["metadata"]:
+ item["metadata"]["labels"] = {}
+ item["metadata"]["labels"].update({"kueue.x-k8s.io/queue-name": lq_name})
+
+
+
+def augment_labels(item: dict, labels: dict)
+
+-
+
+
+
+Expand source code
+
+def augment_labels(item: dict, labels: dict):
+ if "template" in item:
+ if not "labels" in item["template"]["metadata"]:
+ item["template"]["metadata"]["labels"] = {}
+ item["template"]["metadata"]["labels"].update(labels)
+
+
def del_from_list_by_name(l: list, target: List[str]) ‑> list
@@ -568,7 +471,7 @@ Functions
-def generate_appwrapper(name: str, namespace: str, head_cpus: int, head_memory: int, head_gpus: int, min_cpu: int, max_cpu: int, min_memory: int, max_memory: int, gpu: int, workers: int, template: str, image: str, instascale: bool, mcad: bool, instance_types: list, env, image_pull_secrets: list, dispatch_priority: str, priority_val: int, write_to_file: bool, verify_tls: bool, local_queue: Optional[str], labels)
+def generate_appwrapper(name: str, namespace: str, head_cpus: int, head_memory: int, head_gpus: int, min_cpu: int, max_cpu: int, min_memory: int, max_memory: int, gpu: int, workers: int, template: str, image: str, appwrapper: bool, instance_types: list, env, image_pull_secrets: list, write_to_file: bool, verify_tls: bool, local_queue: Optional[str], labels)
-
@@ -590,13 +493,10 @@
Functions
workers: int,
template: str,
image: str,
- instascale: bool,
- mcad: bool,
+ appwrapper: bool,
instance_types: list,
env,
image_pull_secrets: list,
- dispatch_priority: str,
- priority_val: int,
write_to_file: bool,
verify_tls: bool,
local_queue: Optional[str],
@@ -605,7 +505,7 @@ Functions
user_yaml = read_template(template)
appwrapper_name, cluster_name = gen_names(name)
resources = user_yaml.get("spec", "resources")
- item = resources["resources"].get("GenericItems")[0]
+ item = resources.get("components")[0]
update_names(
user_yaml,
item,
@@ -613,20 +513,6 @@ Functions
cluster_name,
namespace,
)
- update_labels(user_yaml, instascale, instance_types)
- update_priority(user_yaml, item, dispatch_priority, priority_val)
- update_custompodresources(
- item,
- min_cpu,
- max_cpu,
- min_memory,
- max_memory,
- gpu,
- workers,
- head_cpus,
- head_memory,
- head_gpus,
- )
update_nodes(
item,
appwrapper_name,
@@ -637,7 +523,6 @@ Functions
gpu,
workers,
image,
- instascale,
env,
image_pull_secrets,
head_cpus,
@@ -645,20 +530,27 @@ Functions
head_gpus,
)
+ augment_labels(item, labels)
+
+ if appwrapper:
+ add_queue_label(user_yaml, namespace, local_queue)
+ else:
+ add_queue_label(item["template"], namespace, local_queue)
+
directory_path = os.path.expanduser("~/.codeflare/resources/")
outfile = os.path.join(directory_path, appwrapper_name + ".yaml")
if write_to_file:
- if mcad:
+ if appwrapper:
write_user_appwrapper(user_yaml, outfile)
else:
- write_components(user_yaml, outfile, namespace, local_queue, labels)
+ write_components(user_yaml, outfile)
return outfile
else:
- if mcad:
+ if appwrapper:
user_yaml = load_appwrapper(user_yaml, name)
else:
- user_yaml = load_components(user_yaml, name, namespace, local_queue, labels)
+ user_yaml = load_components(user_yaml, name)
return user_yaml
@@ -757,7 +649,7 @@ Functions
-def load_components(user_yaml: dict, name: str, namespace: str, local_queue: Optional[str], labels: dict)
+def load_components(user_yaml: dict, name: str)
-
@@ -768,31 +660,12 @@
Functions
def load_components(
user_yaml: dict,
name: str,
- namespace: str,
- local_queue: Optional[str],
- labels: dict,
):
component_list = []
- components = user_yaml.get("spec", "resources")["resources"].get("GenericItems")
- lq_name = local_queue or get_default_kueue_name(namespace)
- cluster_labels = labels
- if not local_queue_exists(namespace, lq_name):
- raise ValueError(
- "local_queue provided does not exist or is not in this namespace. Please provide the correct local_queue name in Cluster Configuration"
- )
+ components = user_yaml.get("spec", "resources").get("components")
for component in components:
- if "generictemplate" in component:
- if (
- "workload.codeflare.dev/appwrapper"
- in component["generictemplate"]["metadata"]["labels"]
- ):
- del component["generictemplate"]["metadata"]["labels"][
- "workload.codeflare.dev/appwrapper"
- ]
- labels = component["generictemplate"]["metadata"]["labels"]
- labels.update({"kueue.x-k8s.io/queue-name": lq_name})
- labels.update(cluster_labels)
- component_list.append(component["generictemplate"])
+ if "template" in component:
+ component_list.append(component["template"])
resources = "---\n" + "---\n".join(
[yaml.dump(component) for component in component_list]
@@ -848,88 +721,6 @@ Functions
print(exc)
-
-def update_affinity(spec, appwrapper_name, instascale)
-
--
-
-
-
-Expand source code
-
-def update_affinity(spec, appwrapper_name, instascale):
- if instascale:
- node_selector_terms = (
- spec.get("affinity")
- .get("nodeAffinity")
- .get("requiredDuringSchedulingIgnoredDuringExecution")
- .get("nodeSelectorTerms")
- )
- node_selector_terms[0]["matchExpressions"][0]["values"][0] = appwrapper_name
- node_selector_terms[0]["matchExpressions"][0]["key"] = appwrapper_name
- else:
- spec.pop("affinity")
-
-
-
-def update_custompodresources(item, min_cpu, max_cpu, min_memory, max_memory, gpu, workers, head_cpus, head_memory, head_gpus)
-
--
-
-
-
-Expand source code
-
-def update_custompodresources(
- item,
- min_cpu,
- max_cpu,
- min_memory,
- max_memory,
- gpu,
- workers,
- head_cpus,
- head_memory,
- head_gpus,
-):
- if "custompodresources" in item.keys():
- custompodresources = item.get("custompodresources")
- for i in range(len(custompodresources)):
- resource = custompodresources[i]
- if i == 0:
- # Leave head node resources as template default
- resource["requests"]["cpu"] = head_cpus
- resource["limits"]["cpu"] = head_cpus
- resource["requests"]["memory"] = head_memory
- resource["limits"]["memory"] = head_memory
- resource["requests"]["nvidia.com/gpu"] = head_gpus
- resource["limits"]["nvidia.com/gpu"] = head_gpus
-
- else:
- for k, v in resource.items():
- if k == "replicas" and i == 1:
- resource[k] = workers
- if k == "requests" or k == "limits":
- for spec, _ in v.items():
- if spec == "cpu":
- if k == "limits":
- resource[k][spec] = max_cpu
- else:
- resource[k][spec] = min_cpu
- if spec == "memory":
- if k == "limits":
- resource[k][spec] = max_memory
- else:
- resource[k][spec] = min_memory
- if spec == "nvidia.com/gpu":
- if i == 0:
- resource[k][spec] = 0
- else:
- resource[k][spec] = gpu
- else:
- sys.exit("Error: malformed template")
-
-
def update_env(spec, env)
@@ -980,31 +771,6 @@ Functions
]
-
-def update_labels(yaml, instascale, instance_types)
-
--
-
-
-
-Expand source code
-
-def update_labels(yaml, instascale, instance_types):
- metadata = yaml.get("metadata")
- if instascale:
- if not len(instance_types) > 0:
- sys.exit(
- "If instascale is set to true, must provide at least one instance type"
- )
- type_str = ""
- for type in instance_types:
- type_str += type + "_"
- type_str = type_str[:-1]
- metadata["labels"]["orderedinstance"] = type_str
- else:
- metadata.pop("labels")
-
-
def update_names(yaml, item, appwrapper_name, cluster_name, namespace)
@@ -1018,14 +784,13 @@ Functions
metadata = yaml.get("metadata")
metadata["name"] = appwrapper_name
metadata["namespace"] = namespace
- lower_meta = item.get("generictemplate", {}).get("metadata")
- lower_meta["labels"]["workload.codeflare.dev/appwrapper"] = appwrapper_name
+ lower_meta = item.get("template", {}).get("metadata")
lower_meta["name"] = cluster_name
lower_meta["namespace"] = namespace
-def update_nodes(item, appwrapper_name, min_cpu, max_cpu, min_memory, max_memory, gpu, workers, image, instascale, env, image_pull_secrets, head_cpus, head_memory, head_gpus)
+def update_nodes(item, appwrapper_name, min_cpu, max_cpu, min_memory, max_memory, gpu, workers, image, env, image_pull_secrets, head_cpus, head_memory, head_gpus)
-
@@ -1043,18 +808,17 @@
Functions
gpu,
workers,
image,
- instascale,
env,
image_pull_secrets,
head_cpus,
head_memory,
head_gpus,
):
- if "generictemplate" in item.keys():
- head = item.get("generictemplate").get("spec").get("headGroupSpec")
+ if "template" in item.keys():
+ head = item.get("template").get("spec").get("headGroupSpec")
head["rayStartParams"]["num-gpus"] = str(int(head_gpus))
- worker = item.get("generictemplate").get("spec").get("workerGroupSpecs")[0]
+ worker = item.get("template").get("spec").get("workerGroupSpecs")[0]
# Head counts as first worker
worker["replicas"] = workers
worker["minReplicas"] = workers
@@ -1064,7 +828,6 @@ Functions
for comp in [head, worker]:
spec = comp.get("template").get("spec")
- update_affinity(spec, appwrapper_name, instascale)
update_image_pull_secrets(spec, image_pull_secrets)
update_image(spec, image)
update_env(spec, env)
@@ -1077,32 +840,6 @@ Functions
update_resources(spec, min_cpu, max_cpu, min_memory, max_memory, gpu)
-
-def update_priority(yaml, item, dispatch_priority, priority_val)
-
--
-
-
-
-Expand source code
-
-def update_priority(yaml, item, dispatch_priority, priority_val):
- spec = yaml.get("spec")
- if dispatch_priority is not None:
- if priority_val:
- spec["priority"] = priority_val
- else:
- raise ValueError(
- "AW generation error: Priority value is None, while dispatch_priority is defined"
- )
- head = item.get("generictemplate").get("spec").get("headGroupSpec")
- worker = item.get("generictemplate").get("spec").get("workerGroupSpecs")[0]
- head["template"]["spec"]["priorityClassName"] = dispatch_priority
- worker["template"]["spec"]["priorityClassName"] = dispatch_priority
- else:
- spec.pop("priority")
-
-
def update_resources(spec, min_cpu, max_cpu, min_memory, max_memory, gpu)
@@ -1128,7 +865,7 @@ Functions
-def write_components(user_yaml: dict, output_file_name: str, namespace: str, local_queue: Optional[str], labels: dict)
+def write_components(user_yaml: dict, output_file_name: str)
-
@@ -1139,40 +876,19 @@
Functions
def write_components(
user_yaml: dict,
output_file_name: str,
- namespace: str,
- local_queue: Optional[str],
- labels: dict,
):
# Create the directory if it doesn't exist
directory_path = os.path.dirname(output_file_name)
if not os.path.exists(directory_path):
os.makedirs(directory_path)
- components = user_yaml.get("spec", "resources")["resources"].get("GenericItems")
+ components = user_yaml.get("spec", "resources").get("components")
open(output_file_name, "w").close()
- lq_name = local_queue or get_default_kueue_name(namespace)
- cluster_labels = labels
- if not local_queue_exists(namespace, lq_name):
- raise ValueError(
- "local_queue provided does not exist or is not in this namespace. Please provide the correct local_queue name in Cluster Configuration"
- )
with open(output_file_name, "a") as outfile:
for component in components:
- if "generictemplate" in component:
- if (
- "workload.codeflare.dev/appwrapper"
- in component["generictemplate"]["metadata"]["labels"]
- ):
- del component["generictemplate"]["metadata"]["labels"][
- "workload.codeflare.dev/appwrapper"
- ]
- labels = component["generictemplate"]["metadata"]["labels"]
- labels.update({"kueue.x-k8s.io/queue-name": lq_name})
- labels.update(cluster_labels)
+ if "template" in component:
outfile.write("---\n")
- yaml.dump(
- component["generictemplate"], outfile, default_flow_style=False
- )
+ yaml.dump(component["template"], outfile, default_flow_style=False)
print(f"Written to: {output_file_name}")
@@ -1215,6 +931,8 @@ Index
Functions
+add_queue_label
+augment_labels
del_from_list_by_name
gen_names
generate_appwrapper
@@ -1225,15 +943,11 @@ Index
load_components
local_queue_exists
read_template
-update_affinity
-update_custompodresources
update_env
update_image
update_image_pull_secrets
-update_labels
update_names
update_nodes
-update_priority
update_resources
write_components
write_user_appwrapper