Skip to content

Commit

Permalink
Add test for autoscaler and its desired state (#2601)
Browse files Browse the repository at this point in the history
* Add test for autoscaler and its desired state

Signed-off-by: hjiang <[email protected]>

* resolve linter issues

Signed-off-by: hjiang <[email protected]>

* simplify python list

Signed-off-by: hjiang <[email protected]>

* cleanup one wait group

Signed-off-by: hjiang <[email protected]>

* use scale down window

Signed-off-by: hjiang <[email protected]>

* prefer HaveLen

Signed-off-by: hjiang <[email protected]>

* fix replica count

Signed-off-by: hjiang <[email protected]>

---------

Signed-off-by: hjiang <[email protected]>
  • Loading branch information
dentiny authored Dec 5, 2024
1 parent 6ca956b commit aeba37e
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 5 deletions.
19 changes: 19 additions & 0 deletions ray-operator/test/e2eautoscaler/create_concurrent_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""This script create a number of tasks at roughly the same time, and wait for their completion."""

import ray
import time
import random

# The task number should be large enough, so the autoscalar is triggered to scale to max replica.
_TASK_NUM = 30
# The min task duration should be long enough, which passes the autoscaling stage of the test.
_TASK_MIN_DUR_SEC = 5
# The max task duration should be reasonable to have a cap on overal test duration.
_TASK_MAX_DUR_SEC = 10

@ray.remote(num_cpus=1)
def f():
sleep_time_sec = random.randint(_TASK_MIN_DUR_SEC, _TASK_MAX_DUR_SEC)
time.sleep(sleep_time_sec)

ray.get([f.remote() for _ in range(_TASK_NUM)])
65 changes: 62 additions & 3 deletions ray-operator/test/e2eautoscaler/raycluster_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestRayClusterAutoscaler(t *testing.T) {
test.StreamKubeRayOperatorLogs()

// Scripts for creating and terminating detached actors to trigger autoscaling
scriptsAC := newConfigMap(namespace.Name, "scripts", files(test, "create_detached_actor.py", "terminate_detached_actor.py"))
scriptsAC := newConfigMap(namespace.Name, files(test, "create_detached_actor.py", "terminate_detached_actor.py"))
scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions)
g.Expect(err).NotTo(gomega.HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name)
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestRayClusterAutoscalerWithFakeGPU(t *testing.T) {
test.StreamKubeRayOperatorLogs()

// Scripts for creating and terminating detached actors to trigger autoscaling
scriptsAC := newConfigMap(namespace.Name, "scripts", files(test, "create_detached_actor.py", "terminate_detached_actor.py"))
scriptsAC := newConfigMap(namespace.Name, files(test, "create_detached_actor.py", "terminate_detached_actor.py"))
scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions)
g.Expect(err).NotTo(gomega.HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name)
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestRayClusterAutoscalerWithCustomResource(t *testing.T) {
test.StreamKubeRayOperatorLogs()

// Scripts for creating and terminating detached actors to trigger autoscaling
scriptsAC := newConfigMap(namespace.Name, "scripts", files(test, "create_detached_actor.py", "terminate_detached_actor.py"))
scriptsAC := newConfigMap(namespace.Name, files(test, "create_detached_actor.py", "terminate_detached_actor.py"))
scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions)
g.Expect(err).NotTo(gomega.HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name)
Expand Down Expand Up @@ -194,3 +194,62 @@ func TestRayClusterAutoscalerWithCustomResource(t *testing.T) {
Should(gomega.WithTransform(RayClusterDesiredWorkerReplicas, gomega.Equal(int32(0))))
})
}

func TestRayClusterAutoscalerWithDesiredState(t *testing.T) {
test := With(t)
g := gomega.NewWithT(t)

const maxReplica = 3
// Set the scale down window to a large enough value, so scale down could be disabled to avoid test flakiness.
const scaleDownWaitSec = 3600

// Create a namespace
namespace := test.NewTestNamespace()
test.StreamKubeRayOperatorLogs()

// Scripts for creating and terminating detached actors to trigger autoscaling
scriptsAC := newConfigMap(namespace.Name, files(test, "create_concurrent_tasks.py"))
scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions)
g.Expect(err).NotTo(gomega.HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name)

groupName := "custom-resource-group"
rayClusterSpecAC := rayv1ac.RayClusterSpec().
WithEnableInTreeAutoscaling(true).
WithRayVersion(GetRayVersion()).
WithHeadGroupSpec(rayv1ac.HeadGroupSpec().
WithRayStartParams(map[string]string{"num-cpus": "0"}).
WithTemplate(headPodTemplateApplyConfiguration())).
WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec().
WithReplicas(0).
WithMinReplicas(0).
WithMaxReplicas(maxReplica).
WithGroupName(groupName).
WithRayStartParams(map[string]string{"num-cpus": "1", "resources": `'{"CustomResource": 1}'`}).
WithTemplate(workerPodTemplateApplyConfiguration())).
WithAutoscalerOptions(rayv1ac.AutoscalerOptions().
WithIdleTimeoutSeconds(scaleDownWaitSec))
rayClusterAC := rayv1ac.RayCluster("ray-cluster", namespace.Name).
WithSpec(apply(rayClusterSpecAC, mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](scripts, "/home/ray/test_scripts")))

rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions)
g.Expect(err).NotTo(gomega.HaveOccurred())
test.T().Logf("Created RayCluster %s/%s successfully", rayCluster.Namespace, rayCluster.Name)

// Wait for RayCluster to become ready and verify the number of available worker replicas.
g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium).
Should(gomega.WithTransform(RayClusterState, gomega.Equal(rayv1.Ready)))
g.Expect(GetRayCluster(test, rayCluster.Namespace, rayCluster.Name)).To(gomega.WithTransform(RayClusterDesiredWorkerReplicas, gomega.Equal(int32(0))))

headPod, err := GetHeadPod(test, rayCluster)
g.Expect(err).NotTo(gomega.HaveOccurred())
test.T().Logf("Found head pod %s/%s", headPod.Namespace, headPod.Name)

// Create a number of tasks and wait for their completion, and a worker in the "custom-resource-group" should be created.
ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/create_concurrent_tasks.py"})

// Scale down has been disabled, after ray script execution completion the cluster is expected to have max replica's number of pods.
pods, err := GetWorkerPods(test, rayCluster)
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(pods).To(gomega.HaveLen(maxReplica))
}
4 changes: 2 additions & 2 deletions ray-operator/test/e2eautoscaler/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func options[T any](options ...option[T]) option[T] {
}
}

func newConfigMap(namespace, name string, options ...option[corev1ac.ConfigMapApplyConfiguration]) *corev1ac.ConfigMapApplyConfiguration {
cmAC := corev1ac.ConfigMap(name, namespace).
func newConfigMap(namespace string, options ...option[corev1ac.ConfigMapApplyConfiguration]) *corev1ac.ConfigMapApplyConfiguration {
cmAC := corev1ac.ConfigMap("scripts", namespace).
WithBinaryData(map[string][]byte{}).
WithImmutable(true)

Expand Down

0 comments on commit aeba37e

Please sign in to comment.