From aeba37e58813e0e415cb937fb0c977f2cfb981c4 Mon Sep 17 00:00:00 2001 From: dentiny Date: Thu, 5 Dec 2024 00:26:42 -0800 Subject: [PATCH] Add test for autoscaler and its desired state (#2601) * Add test for autoscaler and its desired state Signed-off-by: hjiang * resolve linter issues Signed-off-by: hjiang * simplify python list Signed-off-by: hjiang * cleanup one wait group Signed-off-by: hjiang * use scale down window Signed-off-by: hjiang * prefer HaveLen Signed-off-by: hjiang * fix replica count Signed-off-by: hjiang --------- Signed-off-by: hjiang --- .../e2eautoscaler/create_concurrent_tasks.py | 19 ++++++ .../raycluster_autoscaler_test.go | 65 ++++++++++++++++++- ray-operator/test/e2eautoscaler/support.go | 4 +- 3 files changed, 83 insertions(+), 5 deletions(-) create mode 100644 ray-operator/test/e2eautoscaler/create_concurrent_tasks.py diff --git a/ray-operator/test/e2eautoscaler/create_concurrent_tasks.py b/ray-operator/test/e2eautoscaler/create_concurrent_tasks.py new file mode 100644 index 0000000000..98b861c140 --- /dev/null +++ b/ray-operator/test/e2eautoscaler/create_concurrent_tasks.py @@ -0,0 +1,19 @@ +"""This script create a number of tasks at roughly the same time, and wait for their completion.""" + +import ray +import time +import random + +# The task number should be large enough, so the autoscalar is triggered to scale to max replica. +_TASK_NUM = 30 +# The min task duration should be long enough, which passes the autoscaling stage of the test. +_TASK_MIN_DUR_SEC = 5 +# The max task duration should be reasonable to have a cap on overal test duration. +_TASK_MAX_DUR_SEC = 10 + +@ray.remote(num_cpus=1) +def f(): + sleep_time_sec = random.randint(_TASK_MIN_DUR_SEC, _TASK_MAX_DUR_SEC) + time.sleep(sleep_time_sec) + +ray.get([f.remote() for _ in range(_TASK_NUM)]) diff --git a/ray-operator/test/e2eautoscaler/raycluster_autoscaler_test.go b/ray-operator/test/e2eautoscaler/raycluster_autoscaler_test.go index 639f121044..b1d9ea2dbb 100644 --- a/ray-operator/test/e2eautoscaler/raycluster_autoscaler_test.go +++ b/ray-operator/test/e2eautoscaler/raycluster_autoscaler_test.go @@ -20,7 +20,7 @@ func TestRayClusterAutoscaler(t *testing.T) { test.StreamKubeRayOperatorLogs() // Scripts for creating and terminating detached actors to trigger autoscaling - scriptsAC := newConfigMap(namespace.Name, "scripts", files(test, "create_detached_actor.py", "terminate_detached_actor.py")) + scriptsAC := newConfigMap(namespace.Name, files(test, "create_detached_actor.py", "terminate_detached_actor.py")) scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions) g.Expect(err).NotTo(gomega.HaveOccurred()) test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name) @@ -86,7 +86,7 @@ func TestRayClusterAutoscalerWithFakeGPU(t *testing.T) { test.StreamKubeRayOperatorLogs() // Scripts for creating and terminating detached actors to trigger autoscaling - scriptsAC := newConfigMap(namespace.Name, "scripts", files(test, "create_detached_actor.py", "terminate_detached_actor.py")) + scriptsAC := newConfigMap(namespace.Name, files(test, "create_detached_actor.py", "terminate_detached_actor.py")) scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions) g.Expect(err).NotTo(gomega.HaveOccurred()) test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name) @@ -145,7 +145,7 @@ func TestRayClusterAutoscalerWithCustomResource(t *testing.T) { test.StreamKubeRayOperatorLogs() // Scripts for creating and terminating detached actors to trigger autoscaling - scriptsAC := newConfigMap(namespace.Name, "scripts", files(test, "create_detached_actor.py", "terminate_detached_actor.py")) + scriptsAC := newConfigMap(namespace.Name, files(test, "create_detached_actor.py", "terminate_detached_actor.py")) scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions) g.Expect(err).NotTo(gomega.HaveOccurred()) test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name) @@ -194,3 +194,62 @@ func TestRayClusterAutoscalerWithCustomResource(t *testing.T) { Should(gomega.WithTransform(RayClusterDesiredWorkerReplicas, gomega.Equal(int32(0)))) }) } + +func TestRayClusterAutoscalerWithDesiredState(t *testing.T) { + test := With(t) + g := gomega.NewWithT(t) + + const maxReplica = 3 + // Set the scale down window to a large enough value, so scale down could be disabled to avoid test flakiness. + const scaleDownWaitSec = 3600 + + // Create a namespace + namespace := test.NewTestNamespace() + test.StreamKubeRayOperatorLogs() + + // Scripts for creating and terminating detached actors to trigger autoscaling + scriptsAC := newConfigMap(namespace.Name, files(test, "create_concurrent_tasks.py")) + scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions) + g.Expect(err).NotTo(gomega.HaveOccurred()) + test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name) + + groupName := "custom-resource-group" + rayClusterSpecAC := rayv1ac.RayClusterSpec(). + WithEnableInTreeAutoscaling(true). + WithRayVersion(GetRayVersion()). + WithHeadGroupSpec(rayv1ac.HeadGroupSpec(). + WithRayStartParams(map[string]string{"num-cpus": "0"}). + WithTemplate(headPodTemplateApplyConfiguration())). + WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec(). + WithReplicas(0). + WithMinReplicas(0). + WithMaxReplicas(maxReplica). + WithGroupName(groupName). + WithRayStartParams(map[string]string{"num-cpus": "1", "resources": `'{"CustomResource": 1}'`}). + WithTemplate(workerPodTemplateApplyConfiguration())). + WithAutoscalerOptions(rayv1ac.AutoscalerOptions(). + WithIdleTimeoutSeconds(scaleDownWaitSec)) + rayClusterAC := rayv1ac.RayCluster("ray-cluster", namespace.Name). + WithSpec(apply(rayClusterSpecAC, mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](scripts, "/home/ray/test_scripts"))) + + rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) + g.Expect(err).NotTo(gomega.HaveOccurred()) + test.T().Logf("Created RayCluster %s/%s successfully", rayCluster.Namespace, rayCluster.Name) + + // Wait for RayCluster to become ready and verify the number of available worker replicas. + g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + Should(gomega.WithTransform(RayClusterState, gomega.Equal(rayv1.Ready))) + g.Expect(GetRayCluster(test, rayCluster.Namespace, rayCluster.Name)).To(gomega.WithTransform(RayClusterDesiredWorkerReplicas, gomega.Equal(int32(0)))) + + headPod, err := GetHeadPod(test, rayCluster) + g.Expect(err).NotTo(gomega.HaveOccurred()) + test.T().Logf("Found head pod %s/%s", headPod.Namespace, headPod.Name) + + // Create a number of tasks and wait for their completion, and a worker in the "custom-resource-group" should be created. + ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/create_concurrent_tasks.py"}) + + // Scale down has been disabled, after ray script execution completion the cluster is expected to have max replica's number of pods. + pods, err := GetWorkerPods(test, rayCluster) + g.Expect(err).NotTo(gomega.HaveOccurred()) + g.Expect(pods).To(gomega.HaveLen(maxReplica)) +} diff --git a/ray-operator/test/e2eautoscaler/support.go b/ray-operator/test/e2eautoscaler/support.go index 8763a06f4e..7c16600a28 100644 --- a/ray-operator/test/e2eautoscaler/support.go +++ b/ray-operator/test/e2eautoscaler/support.go @@ -41,8 +41,8 @@ func options[T any](options ...option[T]) option[T] { } } -func newConfigMap(namespace, name string, options ...option[corev1ac.ConfigMapApplyConfiguration]) *corev1ac.ConfigMapApplyConfiguration { - cmAC := corev1ac.ConfigMap(name, namespace). +func newConfigMap(namespace string, options ...option[corev1ac.ConfigMapApplyConfiguration]) *corev1ac.ConfigMapApplyConfiguration { + cmAC := corev1ac.ConfigMap("scripts", namespace). WithBinaryData(map[string][]byte{}). WithImmutable(true)