Skip to content

Commit

Permalink
[Feature][autoscaler v2] Set RAY_NODE_TYPE_NAME when starting ray node (
Browse files Browse the repository at this point in the history
  • Loading branch information
kevin85421 authored Mar 7, 2024
1 parent e6722b0 commit 72e9933
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 32 deletions.
11 changes: 11 additions & 0 deletions ray-operator/controllers/ray/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,17 @@ func setContainerEnvVars(pod *corev1.Pod, rayNodeType rayv1.RayNodeType, rayStar
}
container.Env = append(container.Env, rayCloudInstanceID)

// RAY_NODE_TYPE_NAME is used by Ray Autoscaler V2 (alpha). See https://github.com/ray-project/kuberay/issues/1965 for more details.
nodeGroupNameEnv := corev1.EnvVar{
Name: utils.RAY_NODE_TYPE_NAME,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: fmt.Sprintf("metadata.labels['%s']", utils.RayNodeGroupLabelKey),
},
},
}
container.Env = append(container.Env, nodeGroupNameEnv)

// utils.KUBERAY_GEN_RAY_START_CMD stores the `ray start` command generated by KubeRay.
// See https://github.com/ray-project/kuberay/issues/1560 for more details.
generatedRayStartCmdEnv := corev1.EnvVar{Name: utils.KUBERAY_GEN_RAY_START_CMD, Value: rayStartCmd}
Expand Down
34 changes: 4 additions & 30 deletions ray-operator/controllers/ray/common/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,37 +31,17 @@ var instance = rayv1.RayCluster{
},
Spec: rayv1.RayClusterSpec{
HeadGroupSpec: rayv1.HeadGroupSpec{
RayStartParams: map[string]string{
"port": "6379",
"object-manager-port": "12345",
"node-manager-port": "12346",
"object-store-memory": "100000000",
"num-cpus": "1",
"include-dashboard": "true",
"log-color": "true",
},
RayStartParams: map[string]string{},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Labels: map[string]string{
"ray.io/cluster": "raycluster-sample",
"ray.io/group": "headgroup",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ray-head",
Image: "repo/image:custom",
Env: []corev1.EnvVar{
{
Name: "MY_POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
{
Name: "TEST_ENV_NAME",
Value: "TEST_ENV_VALUE",
Expand Down Expand Up @@ -109,14 +89,6 @@ var instance = rayv1.RayCluster{
},
},
Env: []corev1.EnvVar{
{
Name: "MY_POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
{
Name: "TEST_ENV_NAME",
Value: "TEST_ENV_VALUE",
Expand Down Expand Up @@ -343,6 +315,7 @@ func TestBuildPod(t *testing.T) {
checkContainerEnv(t, rayContainer, utils.RAY_USAGE_STATS_KUBERAY_IN_USE, "1")
checkContainerEnv(t, rayContainer, utils.RAY_CLUSTER_NAME, fmt.Sprintf("metadata.labels['%s']", utils.RayClusterLabelKey))
checkContainerEnv(t, rayContainer, utils.RAY_DASHBOARD_ENABLE_K8S_DISK_USAGE, "1")
checkContainerEnv(t, rayContainer, utils.RAY_NODE_TYPE_NAME, fmt.Sprintf("metadata.labels['%s']", utils.RayNodeGroupLabelKey))
headRayStartCommandEnv := getEnvVar(rayContainer, utils.KUBERAY_GEN_RAY_START_CMD)
assert.True(t, strings.Contains(headRayStartCommandEnv.Value, "ray start"))

Expand Down Expand Up @@ -397,6 +370,7 @@ func TestBuildPod(t *testing.T) {
checkContainerEnv(t, rayContainer, utils.RAY_IP, "raycluster-sample-head-svc")
checkContainerEnv(t, rayContainer, utils.RAY_CLUSTER_NAME, fmt.Sprintf("metadata.labels['%s']", utils.RayClusterLabelKey))
checkContainerEnv(t, rayContainer, utils.RAY_DASHBOARD_ENABLE_K8S_DISK_USAGE, "1")
checkContainerEnv(t, rayContainer, utils.RAY_NODE_TYPE_NAME, fmt.Sprintf("metadata.labels['%s']", utils.RayNodeGroupLabelKey))
workerRayStartCommandEnv := getEnvVar(rayContainer, utils.KUBERAY_GEN_RAY_START_CMD)
assert.True(t, strings.Contains(workerRayStartCommandEnv.Value, "ray start"))

Expand Down Expand Up @@ -785,12 +759,12 @@ func TestHeadPodTemplate_WithServiceAccount(t *testing.T) {

func TestValidateHeadRayStartParams_OK(t *testing.T) {
input := instance.Spec.HeadGroupSpec.DeepCopy()
input.RayStartParams = map[string]string{"include-dashboard": "true"}
isValid, err := ValidateHeadRayStartParams(context.Background(), *input)
assert.Equal(t, true, isValid)
assert.Nil(t, err)
command := convertParamMap(input.RayStartParams)
assert.True(t, strings.Contains(command, "--include-dashboard=true"))
assert.True(t, strings.Contains(command, "--log-color=true"))
}

func TestValidateHeadRayStartParams_ValidWithObjectStoreMemoryError(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions ray-operator/controllers/ray/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,11 @@ const (
RAY_DASHBOARD_ADDRESS = "RAY_DASHBOARD_ADDRESS"
RAY_JOB_SUBMISSION_ID = "RAY_JOB_SUBMISSION_ID"

// This environment variable is used by Ray Autoscaler V2. For the Autoscaler V2 alpha
// release, its value is the Pod name. This may change in the future.
// Environment variables for Ray Autoscaler V2.
// The value of RAY_CLOUD_INSTANCE_ID is the Pod name for Autoscaler V2 alpha. This may change in the future.
RAY_CLOUD_INSTANCE_ID = "RAY_CLOUD_INSTANCE_ID"
// The value of RAY_NODE_TYPE_NAME is the name of the node group (i.e., the value of the "ray.io/group" label).
RAY_NODE_TYPE_NAME = "RAY_NODE_TYPE_NAME"

// This KubeRay operator environment variable is used to determine if random Pod
// deletion should be enabled. Note that this only takes effect when autoscaling
Expand Down

0 comments on commit 72e9933

Please sign in to comment.