diff --git a/ray-operator/test/e2eautoscaler/create_nested_remote_function.py b/ray-operator/test/e2eautoscaler/create_nested_remote_function.py new file mode 100644 index 0000000000..4c8a18b183 --- /dev/null +++ b/ray-operator/test/e2eautoscaler/create_nested_remote_function.py @@ -0,0 +1,31 @@ +import ray +import time +import argparse + +parser = argparse.ArgumentParser() +parser.add_argument('--num-cpus', type=float, default=1) +args = parser.parse_args() + +@ray.remote(num_cpus=args.num_cpus) +def child_task1(): + time.sleep(5) + +@ray.remote(num_cpus=args.num_cpus) +def child_task2(): + time.sleep(30) + +@ray.remote(num_cpus=args.num_cpus) +def parent_task(): + future_list = [child_task1.remote(), + child_task2.remote()] + + # `child_task1` is intended to be scheduled on the same node as the parent task. + # `child_task2` is intended to be scheduled on another node. + # After `child_task1` is finished, there'll be no resource utilization on the node where the parent task is running. + # However, the node where the parent task is running should not be downscaled until `child_task2` is finished + ray.get(future_list) + + + +ray.init(namespace="default_namespace") +ray.get(parent_task.remote()) \ No newline at end of file diff --git a/ray-operator/test/e2eautoscaler/raycluster_autoscaler_test.go b/ray-operator/test/e2eautoscaler/raycluster_autoscaler_test.go index 639f121044..c8273ecc4c 100644 --- a/ray-operator/test/e2eautoscaler/raycluster_autoscaler_test.go +++ b/ray-operator/test/e2eautoscaler/raycluster_autoscaler_test.go @@ -2,6 +2,7 @@ package e2eautoscaler import ( "testing" + "time" "github.com/onsi/gomega" @@ -194,3 +195,71 @@ func TestRayClusterAutoscalerWithCustomResource(t *testing.T) { Should(gomega.WithTransform(RayClusterDesiredWorkerReplicas, gomega.Equal(int32(0)))) }) } + +func TestRayClusterAutoscalerNestedRemoteFunction(t *testing.T) { + test := With(t) + g := gomega.NewWithT(t) + + const maxReplicas = 2 + const idleTimeOutSec = 5 + + // Create a namespace + namespace := test.NewTestNamespace() + test.StreamKubeRayOperatorLogs() + + // Scripts for creating and terminating detached actors to trigger autoscaling + scriptsAC := newConfigMap(namespace.Name, "scripts", files(test, "create_nested_remote_function.py")) + scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions) + g.Expect(err).NotTo(gomega.HaveOccurred()) + test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name) + + rayClusterSpecAC := rayv1ac.RayClusterSpec(). + WithEnableInTreeAutoscaling(true). + WithRayVersion(GetRayVersion()). + WithHeadGroupSpec(rayv1ac.HeadGroupSpec(). + WithRayStartParams(map[string]string{"num-cpus": "0"}). + WithTemplate(headPodTemplateApplyConfiguration())). + WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec(). + WithReplicas(0). + WithMinReplicas(0). + WithMaxReplicas(maxReplicas). + WithGroupName("small-group"). + WithRayStartParams(map[string]string{"num-cpus": "1"}). + WithTemplate(workerPodTemplateApplyConfiguration())). + WithAutoscalerOptions(rayv1ac.AutoscalerOptions(). + WithIdleTimeoutSeconds(idleTimeOutSec)) + rayClusterAC := rayv1ac.RayCluster("ray-cluster", namespace.Name). + WithSpec(apply(rayClusterSpecAC, mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](scripts, "/home/ray/test_scripts"))) + + rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) + g.Expect(err).NotTo(gomega.HaveOccurred()) + test.T().Logf("Created RayCluster %s/%s successfully", rayCluster.Namespace, rayCluster.Name) + + // Wait for RayCluster to become ready and verify the number of available worker replicas. + g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + Should(gomega.WithTransform(RayClusterState, gomega.Equal(rayv1.Ready))) + g.Expect(GetRayCluster(test, rayCluster.Namespace, rayCluster.Name)).To(gomega.WithTransform(RayClusterDesiredWorkerReplicas, gomega.Equal(int32(0)))) + + headPod, err := GetHeadPod(test, rayCluster) + g.Expect(err).NotTo(gomega.HaveOccurred()) + test.T().Logf("Found head pod %s/%s", headPod.Namespace, headPod.Name) + + // Create a number of tasks and wait for their completion, and a worker in the "custom-resource-group" should be created. + ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/create_nested_remote_function.py"}) + + pods, err := GetWorkerPods(test, rayCluster) + g.Expect(err).NotTo(gomega.HaveOccurred()) + g.Expect(pods).To(gomega.HaveLen(maxReplicas)) + + // Wait for the first child task to finish and afer the idle timeout + time.Sleep(20 * time.Second) + + // The second child task is still running, so none of the worker pods should be deleted. (node 1: parent blocking, node 2: child 2 running) + pods, err = GetWorkerPods(test, rayCluster) + g.Expect(err).NotTo(gomega.HaveOccurred()) + g.Expect(pods).To(gomega.HaveLen(maxReplicas)) + + // After the second child task finishes, all worker pods should be deleted due to the idle timeout. + g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + Should(gomega.WithTransform(RayClusterDesiredWorkerReplicas, gomega.Equal(int32(0)))) +}