diff --git a/ray-operator/test/e2e/rayjob_cluster_selector_test.go b/ray-operator/test/e2e/rayjob_cluster_selector_test.go index 48c84016e4..b8c0e85ce0 100644 --- a/ray-operator/test/e2e/rayjob_cluster_selector_test.go +++ b/ray-operator/test/e2e/rayjob_cluster_selector_test.go @@ -13,6 +13,7 @@ import ( func TestRayJobWithClusterSelector(t *testing.T) { test := With(t) + g := NewWithT(t) // Create a namespace namespace := test.NewTestNamespace() @@ -21,7 +22,7 @@ func TestRayJobWithClusterSelector(t *testing.T) { // Job scripts jobsAC := newConfigMap(namespace.Name, "jobs", files(test, "counter.py", "fail.py")) jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name) // RayCluster @@ -29,11 +30,11 @@ func TestRayJobWithClusterSelector(t *testing.T) { WithSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))) rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayCluster %s/%s successfully", rayCluster.Namespace, rayCluster.Name) test.T().Logf("Waiting for RayCluster %s/%s to become ready", rayCluster.Namespace, rayCluster.Name) - test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). Should(WithTransform(RayClusterState, Equal(rayv1.Ready))) test.T().Run("Successful RayJob", func(t *testing.T) { @@ -51,15 +52,15 @@ env_vars: WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration())) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) test.T().Logf("Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name) - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). Should(WithTransform(RayJobStatus, Satisfy(rayv1.IsJobTerminal))) // Assert the Ray job has completed successfully - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusSucceeded))) }) @@ -75,15 +76,15 @@ env_vars: WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration())) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) test.T().Logf("Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name) - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). Should(WithTransform(RayJobStatus, Satisfy(rayv1.IsJobTerminal))) // Assert the Ray job has failed - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusFailed))) }) } diff --git a/ray-operator/test/e2e/rayjob_lightweight_test.go b/ray-operator/test/e2e/rayjob_lightweight_test.go index 5f8ee86a8f..c1818d4537 100644 --- a/ray-operator/test/e2e/rayjob_lightweight_test.go +++ b/ray-operator/test/e2e/rayjob_lightweight_test.go @@ -5,10 +5,11 @@ import ( . "github.com/onsi/gomega" - k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" corev1ac "k8s.io/client-go/applyconfigurations/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1" . "github.com/ray-project/kuberay/ray-operator/test/support" @@ -16,6 +17,7 @@ import ( func TestRayJobLightWeightMode(t *testing.T) { test := With(t) + g := NewWithT(t) // Create a namespace namespace := test.NewTestNamespace() @@ -24,7 +26,7 @@ func TestRayJobLightWeightMode(t *testing.T) { // Job scripts jobsAC := newConfigMap(namespace.Name, "jobs", files(test, "counter.py", "fail.py", "stop.py")) jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name) test.T().Run("Successful RayJob", func(_ *testing.T) { @@ -53,28 +55,29 @@ env_vars: mountConfigMap[corev1ac.PodTemplateSpecApplyConfiguration](jobs, "/home/ray/jobs")))))) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) test.T().Logf("Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name) - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). Should(WithTransform(RayJobStatus, Satisfy(rayv1.IsJobTerminal))) // Assert the RayJob has completed successfully - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusSucceeded))) // And the RayJob deployment status is updated accordingly - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name)). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name)). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusComplete))) // Refresh the RayJob status - rayJob = GetRayJob(test, rayJob.Namespace, rayJob.Name) + rayJob, err = GetRayJob(test, rayJob.Namespace, rayJob.Name) + g.Expect(err).NotTo(HaveOccurred()) // TODO (kevin85421): We may need to use `Eventually` instead if the assertion is flaky. // Assert the RayCluster has been torn down - _, err = test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayJob.Status.RayClusterName, metav1.GetOptions{}) - test.Expect(err).To(MatchError(k8serrors.NewNotFound(rayv1.Resource("rayclusters"), rayJob.Status.RayClusterName))) + _, err = GetRayCluster(test, namespace.Name, rayJob.Status.RayClusterName) + g.Expect(k8serrors.IsNotFound(err)).To(BeTrue()) }) test.T().Run("Failing RayJob without cluster shutdown after finished", func(_ *testing.T) { @@ -86,25 +89,25 @@ env_vars: WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs")))) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) test.T().Logf("Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name) - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). Should(WithTransform(RayJobStatus, Satisfy(rayv1.IsJobTerminal))) // Assert the Ray job has failed - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusFailed))) // Assert that the RayJob deployment status and RayJob reason have been updated accordingly. - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name)). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name)). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed))) - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). To(WithTransform(RayJobReason, Equal(rayv1.AppFailed))) // In the lightweight submission mode, the submitter Kubernetes Job should not be created. - test.Eventually(Jobs(test, namespace.Name)).Should(BeEmpty()) + g.Eventually(Jobs(test, namespace.Name)).Should(BeEmpty()) }) test.T().Run("Should transition to 'Complete' if the Ray job has stopped.", func(_ *testing.T) { @@ -117,24 +120,23 @@ env_vars: WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs")))) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) test.T().Logf("Waiting for RayJob %s/%s to be 'Running'", rayJob.Namespace, rayJob.Name) - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusRunning))) test.T().Logf("Waiting for RayJob %s/%s to be 'Complete'", rayJob.Namespace, rayJob.Name) - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusComplete))) // Refresh the RayJob status - rayJob = GetRayJob(test, rayJob.Namespace, rayJob.Name) - test.Expect(rayJob.Status.JobStatus).To(Equal(rayv1.JobStatusStopped)) + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)).To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusStopped))) // Delete the RayJob err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Delete(test.Ctx(), rayJob.Name, metav1.DeleteOptions{}) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Deleted RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) }) } diff --git a/ray-operator/test/e2e/rayjob_retry_test.go b/ray-operator/test/e2e/rayjob_retry_test.go index 462edb5482..1578426f3b 100644 --- a/ray-operator/test/e2e/rayjob_retry_test.go +++ b/ray-operator/test/e2e/rayjob_retry_test.go @@ -6,6 +6,8 @@ import ( . "github.com/onsi/gomega" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1" . "github.com/ray-project/kuberay/ray-operator/test/support" @@ -13,6 +15,7 @@ import ( func TestRayJobRetry(t *testing.T) { test := With(t) + g := NewWithT(t) // Create a namespace namespace := test.NewTestNamespace() @@ -21,7 +24,7 @@ func TestRayJobRetry(t *testing.T) { // Job scripts jobsAC := newConfigMap(namespace.Name, "jobs", files(test, "fail.py")) jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name) test.T().Run("Failing RayJob without cluster shutdown after finished", func(_ *testing.T) { @@ -37,41 +40,44 @@ func TestRayJobRetry(t *testing.T) { WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration())) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) test.T().Logf("Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name) // Assert that the RayJob deployment status and RayJob reason have been updated accordingly. - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutLong). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutLong). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed))) - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusFailed))) - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). To(WithTransform(RayJobReason, Equal(rayv1.AppFailed))) - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusFailed))) // Check whether the controller respects the backoffLimit. - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). Should(WithTransform(RayJobFailed, Equal(int32(3)))) - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). Should(WithTransform(RayJobSucceeded, Equal(int32(0)))) // Refresh the RayJob status - rayJob = GetRayJob(test, rayJob.Namespace, rayJob.Name) + rayJob, err = GetRayJob(test, rayJob.Namespace, rayJob.Name) + g.Expect(err).NotTo(HaveOccurred()) // Delete the RayJob err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Delete(test.Ctx(), rayJob.Name, metav1.DeleteOptions{}) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Deleted RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) // Assert the RayCluster has been cascade deleted - test.Eventually(NotFound(RayClusterOrError(test, namespace.Name, rayJob.Status.RayClusterName))). - Should(BeTrue()) + g.Eventually(func() error { + _, err := GetRayCluster(test, namespace.Name, rayJob.Status.RayClusterName) + return err + }).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) // Assert the submitter Job has been cascade deleted - test.Eventually(Jobs(test, namespace.Name)).Should(BeEmpty()) + g.Eventually(Jobs(test, namespace.Name)).Should(BeEmpty()) }) test.T().Run("Failing submitter K8s Job", func(_ *testing.T) { @@ -92,38 +98,41 @@ func TestRayJobRetry(t *testing.T) { rayJobAC.Spec.SubmitterPodTemplate.Spec.Containers[0].WithCommand("ray", "job", "submit", "--address", "http://do-not-exist:8265", "--", "echo 123") rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) test.T().Logf("Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name) // Ensure JobDeploymentStatus transit to Failed - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed))) // Ensure JobStatus is empty - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusNew))) // Ensure Reason is SubmissionFailed - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). To(WithTransform(RayJobReason, Equal(rayv1.SubmissionFailed))) // Check whether the controller respects the backoffLimit. - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). Should(WithTransform(RayJobFailed, Equal(int32(3)))) - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). Should(WithTransform(RayJobSucceeded, Equal(int32(0)))) // Refresh the RayJob status - rayJob = GetRayJob(test, rayJob.Namespace, rayJob.Name) + rayJob, err = GetRayJob(test, rayJob.Namespace, rayJob.Name) + g.Expect(err).NotTo(HaveOccurred()) // Assert the RayCluster has been deleted because ShutdownAfterJobFinishes is true. - test.Eventually(NotFound(RayClusterOrError(test, namespace.Name, rayJob.Status.RayClusterName)), TestTimeoutMedium). - Should(BeTrue()) + g.Eventually(func() error { + _, err := GetRayCluster(test, namespace.Name, rayJob.Status.RayClusterName) + return err + }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) // Asset submitter Job is not deleted yet - test.Eventually(Jobs(test, namespace.Name)).ShouldNot(BeEmpty()) + g.Eventually(Jobs(test, namespace.Name)).ShouldNot(BeEmpty()) // Delete the RayJob err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Delete(test.Ctx(), rayJob.Name, metav1.DeleteOptions{}) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Deleted RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) }) @@ -143,20 +152,20 @@ func TestRayJobRetry(t *testing.T) { WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration())) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) // The RayJob will transition to `Failed` because it has passed `ActiveDeadlineSeconds`. test.T().Logf("Waiting for RayJob %s/%s to be 'Failed'", rayJob.Namespace, rayJob.Name) - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutShort). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutShort). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed))) - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). To(WithTransform(RayJobReason, Equal(rayv1.DeadlineExceeded))) - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). Should(WithTransform(RayJobFailed, Equal(int32(1)))) - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). Should(WithTransform(RayJobSucceeded, Equal(int32(0)))) }) @@ -171,32 +180,32 @@ func TestRayJobRetry(t *testing.T) { WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs")))) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) test.T().Logf("Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name) // Assert that the RayJob deployment status has been updated. - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed))) // Assert the Ray job has failed. - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusFailed))) // Check the RayJob reason has been updated. - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). To(WithTransform(RayJobReason, Equal(rayv1.AppFailed))) // Check whether the controller respects the backoffLimit. - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). Should(WithTransform(RayJobFailed, Equal(int32(3)))) // 2 retries + 1 initial attempt = 3 failures - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). Should(WithTransform(RayJobSucceeded, Equal(int32(0)))) // Clean up err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Delete(test.Ctx(), rayJob.Name, metav1.DeleteOptions{}) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Deleted RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) }) } diff --git a/ray-operator/test/e2e/rayjob_suspend_test.go b/ray-operator/test/e2e/rayjob_suspend_test.go index 8e6a8f8d2a..b14573f4c5 100644 --- a/ray-operator/test/e2e/rayjob_suspend_test.go +++ b/ray-operator/test/e2e/rayjob_suspend_test.go @@ -5,9 +5,10 @@ import ( . "github.com/onsi/gomega" - k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1" @@ -16,6 +17,7 @@ import ( func TestRayJobSuspend(t *testing.T) { test := With(t) + g := NewWithT(t) // Create a namespace namespace := test.NewTestNamespace() @@ -24,7 +26,7 @@ func TestRayJobSuspend(t *testing.T) { // Job scripts jobsAC := newConfigMap(namespace.Name, "jobs", files(test, "long_running.py", "counter.py")) jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name) test.T().Run("Suspend the RayJob when its status is 'Running', and then resume it.", func(_ *testing.T) { @@ -38,29 +40,29 @@ func TestRayJobSuspend(t *testing.T) { WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration())) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) test.T().Logf("Waiting for RayJob %s/%s to be 'Running'", rayJob.Namespace, rayJob.Name) - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusRunning))) test.T().Logf("Suspend the RayJob %s/%s", rayJob.Namespace, rayJob.Name) rayJobAC.Spec.WithSuspend(true) rayJob, err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Waiting for RayJob %s/%s to be 'Suspended'", rayJob.Namespace, rayJob.Name) - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusSuspended))) // TODO (kevin85421): We may need to use `Eventually` instead if the assertion is flaky. // Assert the RayCluster has been torn down - _, err = test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayJob.Status.RayClusterName, metav1.GetOptions{}) - test.Expect(err).To(MatchError(k8serrors.NewNotFound(rayv1.Resource("rayclusters"), rayJob.Status.RayClusterName))) + _, err = GetRayCluster(test, namespace.Name, rayJob.Status.RayClusterName) + g.Expect(k8serrors.IsNotFound(err)).To(BeTrue()) // Assert the submitter Job has been cascade deleted - test.Eventually(Jobs(test, namespace.Name)).Should(BeEmpty()) + g.Eventually(Jobs(test, namespace.Name)).Should(BeEmpty()) // TODO (kevin85421): Check whether the Pods associated with the RayCluster and the submitter Job have been deleted. // For Kubernetes Jobs, the default deletion behavior is "orphanDependents," which means the Pods will not be @@ -69,13 +71,13 @@ func TestRayJobSuspend(t *testing.T) { test.T().Logf("Resume the RayJob by updating `suspend` to false.") rayJobAC.Spec.WithSuspend(false) rayJob, err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + g.Expect(err).NotTo(HaveOccurred()) + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusRunning))) // Delete the RayJob err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Delete(test.Ctx(), rayJob.Name, metav1.DeleteOptions{}) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Deleted RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) }) @@ -94,40 +96,43 @@ env_vars: WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs")))) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) test.T().Logf("Waiting for RayJob %s/%s to be 'Suspended'", rayJob.Namespace, rayJob.Name) - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusSuspended))) test.T().Logf("Resume the RayJob by updating `suspend` to false.") rayJobAC.Spec.WithSuspend(false) rayJob, err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name) - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusComplete))) // Assert the RayJob has completed successfully - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusSucceeded))) // Refresh the RayJob status - rayJob = GetRayJob(test, rayJob.Namespace, rayJob.Name) + rayJob, err = GetRayJob(test, rayJob.Namespace, rayJob.Name) + g.Expect(err).NotTo(HaveOccurred()) // Delete the RayJob err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Delete(test.Ctx(), rayJob.Name, metav1.DeleteOptions{}) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Deleted RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) // Assert the RayCluster has been cascade deleted - test.Eventually(NotFound(RayClusterOrError(test, rayJob.Namespace, rayJob.Status.RayClusterName))). - Should(BeTrue()) + g.Eventually(func() error { + _, err := GetRayCluster(test, namespace.Name, rayJob.Status.RayClusterName) + return err + }).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) // Assert the Pods has been cascade deleted - test.Eventually(Pods(test, namespace.Name, + g.Eventually(Pods(test, namespace.Name, LabelSelector(utils.RayClusterLabelKey+"="+rayJob.Status.RayClusterName))). Should(BeEmpty()) }) diff --git a/ray-operator/test/e2e/rayjob_test.go b/ray-operator/test/e2e/rayjob_test.go index 97f7d774cf..6692bf0c1c 100644 --- a/ray-operator/test/e2e/rayjob_test.go +++ b/ray-operator/test/e2e/rayjob_test.go @@ -5,9 +5,10 @@ import ( "time" . "github.com/onsi/gomega" - k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1" . "github.com/ray-project/kuberay/ray-operator/test/support" @@ -15,6 +16,7 @@ import ( func TestRayJob(t *testing.T) { test := With(t) + g := NewWithT(t) // Create a namespace namespace := test.NewTestNamespace() @@ -23,7 +25,7 @@ func TestRayJob(t *testing.T) { // Job scripts jobsAC := newConfigMap(namespace.Name, "jobs", files(test, "counter.py", "fail.py", "stop.py", "long_running.py")) jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name) test.T().Run("Successful RayJob", func(_ *testing.T) { @@ -40,31 +42,32 @@ env_vars: WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration())) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) test.T().Logf("Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name) - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). Should(WithTransform(RayJobStatus, Satisfy(rayv1.IsJobTerminal))) // Assert the RayJob has completed successfully - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusSucceeded))) // And the RayJob deployment status is updated accordingly - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name)). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name)). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusComplete))) // Refresh the RayJob status - rayJob = GetRayJob(test, rayJob.Namespace, rayJob.Name) + rayJob, err = GetRayJob(test, rayJob.Namespace, rayJob.Name) + g.Expect(err).NotTo(HaveOccurred()) // TODO (kevin85421): We may need to use `Eventually` instead if the assertion is flaky. // Assert the RayCluster has been torn down - _, err = test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayJob.Status.RayClusterName, metav1.GetOptions{}) - test.Expect(err).To(MatchError(k8serrors.NewNotFound(rayv1.Resource("rayclusters"), rayJob.Status.RayClusterName))) + _, err = GetRayCluster(test, namespace.Name, rayJob.Status.RayClusterName) + g.Expect(k8serrors.IsNotFound(err)).To(BeTrue()) // Assert the submitter Job has not been deleted - test.Eventually(Jobs(test, namespace.Name)).ShouldNot(BeEmpty()) + g.Eventually(Jobs(test, namespace.Name)).ShouldNot(BeEmpty()) // TODO (kevin85421): Check whether the Pods associated with the RayCluster and the submitter Job have been deleted. // For Kubernetes Jobs, the default deletion behavior is "orphanDependents," which means the Pods will not be @@ -73,13 +76,13 @@ env_vars: test.T().Logf("Update `suspend` to true. However, since the RayJob is completed, the status should not be updated to `Suspended`.") rayJobAC.Spec.WithSuspend(true) rayJob, err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) - test.Consistently(RayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(err).NotTo(HaveOccurred()) + g.Consistently(RayJob(test, rayJob.Namespace, rayJob.Name)). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusComplete))) // Delete the RayJob err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Delete(test.Ctx(), rayJob.Name, metav1.DeleteOptions{}) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Deleted RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) }) @@ -93,39 +96,42 @@ env_vars: WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration())) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) test.T().Logf("Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name) - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). Should(WithTransform(RayJobStatus, Satisfy(rayv1.IsJobTerminal))) // Assert the Ray job has failed - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusFailed))) // Assert that the RayJob deployment status and RayJob reason have been updated accordingly. - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name)). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name)). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed))) - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). To(WithTransform(RayJobReason, Equal(rayv1.AppFailed))) // TODO (kevin85421): Ensure the RayCluster and Kubernetes Job are not deleted because `ShutdownAfterJobFinishes` is false. // Refresh the RayJob status - rayJob = GetRayJob(test, rayJob.Namespace, rayJob.Name) + rayJob, err = GetRayJob(test, rayJob.Namespace, rayJob.Name) + g.Expect(err).NotTo(HaveOccurred()) // Delete the RayJob err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Delete(test.Ctx(), rayJob.Name, metav1.DeleteOptions{}) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Deleted RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) // Assert the RayCluster has been cascade deleted - test.Eventually(NotFound(RayClusterOrError(test, namespace.Name, rayJob.Status.RayClusterName))). - Should(BeTrue()) + g.Eventually(func() error { + _, err := GetRayCluster(test, namespace.Name, rayJob.Status.RayClusterName) + return err + }).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) // Assert the submitter Job has been cascade deleted - test.Eventually(Jobs(test, namespace.Name)).Should(BeEmpty()) + g.Eventually(Jobs(test, namespace.Name)).Should(BeEmpty()) }) test.T().Run("Failing submitter K8s Job", func(_ *testing.T) { @@ -143,29 +149,32 @@ env_vars: rayJobAC.Spec.SubmitterPodTemplate.Spec.Containers[0].WithCommand("ray", "job", "submit", "--address", "http://do-not-exist:8265", "--", "echo 123") rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) test.T().Logf("Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name) - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed))) - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusNew))) - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). To(WithTransform(RayJobReason, Equal(rayv1.SubmissionFailed))) // Refresh the RayJob status - rayJob = GetRayJob(test, rayJob.Namespace, rayJob.Name) + rayJob, err = GetRayJob(test, rayJob.Namespace, rayJob.Name) + g.Expect(err).NotTo(HaveOccurred()) // Assert the RayCluster has been deleted because ShutdownAfterJobFinishes is true. - test.Eventually(NotFound(RayClusterOrError(test, namespace.Name, rayJob.Status.RayClusterName)), TestTimeoutMedium). - Should(BeTrue()) + g.Eventually(func() error { + _, err := GetRayCluster(test, namespace.Name, rayJob.Status.RayClusterName) + return err + }).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) // Asset submitter Job is not deleted yet - test.Eventually(Jobs(test, namespace.Name)).ShouldNot(BeEmpty()) + g.Eventually(Jobs(test, namespace.Name)).ShouldNot(BeEmpty()) // Delete the RayJob err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Delete(test.Ctx(), rayJob.Name, metav1.DeleteOptions{}) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Deleted RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) }) @@ -179,24 +188,23 @@ env_vars: WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs")))) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) test.T().Logf("Waiting for RayJob %s/%s to be 'Running'", rayJob.Namespace, rayJob.Name) - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusRunning))) test.T().Logf("Waiting for RayJob %s/%s to be 'Complete'", rayJob.Namespace, rayJob.Name) - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusComplete))) // Refresh the RayJob status - rayJob = GetRayJob(test, rayJob.Namespace, rayJob.Name) - test.Expect(rayJob.Status.JobStatus).To(Equal(rayv1.JobStatusStopped)) + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)).To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusStopped))) // Delete the RayJob err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Delete(test.Ctx(), rayJob.Name, metav1.DeleteOptions{}) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Deleted RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) }) @@ -208,11 +216,11 @@ env_vars: WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs")))) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) // `RuntimeEnvYAML` is not a valid YAML string, so the RayJob controller will not do anything with the CR. - test.Consistently(RayJob(test, rayJob.Namespace, rayJob.Name), 5*time.Second). + g.Consistently(RayJob(test, rayJob.Namespace, rayJob.Name), 5*time.Second). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusNew))) }) @@ -227,14 +235,14 @@ env_vars: WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration())) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) // The RayJob will transition to `Complete` because it has passed `ActiveDeadlineSeconds`. test.T().Logf("Waiting for RayJob %s/%s to be 'Complete'", rayJob.Namespace, rayJob.Name) - test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutShort). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutShort). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed))) - test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). To(WithTransform(RayJobReason, Equal(rayv1.DeadlineExceeded))) }) } diff --git a/ray-operator/test/e2e/rayservice_ha_test.go b/ray-operator/test/e2e/rayservice_ha_test.go index 03c184f2f5..f71ca7ea2a 100644 --- a/ray-operator/test/e2e/rayservice_ha_test.go +++ b/ray-operator/test/e2e/rayservice_ha_test.go @@ -15,6 +15,7 @@ import ( func TestRayService(t *testing.T) { test := With(t) + g := NewWithT(t) // Create a namespace namespace := test.NewTestNamespace() @@ -23,7 +24,7 @@ func TestRayService(t *testing.T) { // Scripts for creating and terminating detached actors to trigger autoscaling scriptsAC := newConfigMap(namespace.Name, "scripts", files(test, "locustfile.py", "locust_runner.py")) scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name) test.T().Run("Static RayService", func(_ *testing.T) { @@ -49,11 +50,11 @@ applications: WithRayClusterSpec(newRayClusterSpec())) rayService, err := test.Client().Ray().RayV1().RayServices(namespace.Name).Apply(test.Ctx(), rayServiceAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayService %s/%s successfully", rayService.Namespace, rayService.Name) test.T().Logf("Waiting for RayService %s/%s to running", rayService.Namespace, rayService.Name) - test.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutMedium). + g.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutMedium). Should(WithTransform(RayServiceStatus, Equal(rayv1.Running))) locustClusterAC := rayv1ac.RayCluster("locust-cluster", namespace.Name). @@ -63,16 +64,16 @@ applications: WithRayStartParams(map[string]string{"dashboard-host": "0.0.0.0"}). WithTemplate(apply(headPodTemplateApplyConfiguration(), mountConfigMap[corev1ac.PodTemplateSpecApplyConfiguration](scripts, "/home/ray/test_scripts"))))) locustCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), locustClusterAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created Locust RayCluster %s/%s successfully", locustCluster.Namespace, locustCluster.Name) // Wait for RayCluster to become ready and verify the number of available worker replicas. - test.Eventually(RayCluster(test, locustCluster.Namespace, locustCluster.Name), TestTimeoutMedium). + g.Eventually(RayCluster(test, locustCluster.Namespace, locustCluster.Name), TestTimeoutMedium). Should(WithTransform(RayClusterState, Equal(rayv1.Ready))) - locustCluster = GetRayCluster(test, locustCluster.Namespace, locustCluster.Name) - test.Expect(locustCluster.Status.DesiredWorkerReplicas).To(Equal(int32(0))) + g.Expect(GetRayCluster(test, locustCluster.Namespace, locustCluster.Name)).To(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(0)))) - headPod := GetHeadPod(test, locustCluster) + headPod, err := GetHeadPod(test, locustCluster) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Found head pod %s/%s", headPod.Namespace, headPod.Name) // Install Locust in the head Pod diff --git a/ray-operator/test/e2e/support.go b/ray-operator/test/e2e/support.go index 40249c0348..d9e3544033 100644 --- a/ray-operator/test/e2e/support.go +++ b/ray-operator/test/e2e/support.go @@ -3,7 +3,7 @@ package e2e import ( "embed" - "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -19,7 +19,7 @@ var _files embed.FS func ReadFile(t Test, fileName string) []byte { t.T().Helper() file, err := _files.ReadFile(fileName) - t.Expect(err).NotTo(gomega.HaveOccurred()) + assert.NoError(t.T(), err) return file } diff --git a/ray-operator/test/e2eautoscaler/raycluster_autoscaler_test.go b/ray-operator/test/e2eautoscaler/raycluster_autoscaler_test.go index 1a18bf229e..02b0409dd4 100644 --- a/ray-operator/test/e2eautoscaler/raycluster_autoscaler_test.go +++ b/ray-operator/test/e2eautoscaler/raycluster_autoscaler_test.go @@ -13,6 +13,7 @@ import ( func TestRayClusterAutoscaler(t *testing.T) { test := With(t) + g := NewWithT(t) // Create a namespace namespace := test.NewTestNamespace() @@ -21,7 +22,7 @@ func TestRayClusterAutoscaler(t *testing.T) { // Scripts for creating and terminating detached actors to trigger autoscaling scriptsAC := newConfigMap(namespace.Name, "scripts", files(test, "create_detached_actor.py", "terminate_detached_actor.py")) scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name) test.T().Run("Create a RayCluster with autoscaling enabled", func(_ *testing.T) { @@ -42,42 +43,43 @@ func TestRayClusterAutoscaler(t *testing.T) { WithSpec(apply(rayClusterSpecAC, mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](scripts, "/home/ray/test_scripts"))) rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayCluster %s/%s successfully", rayCluster.Namespace, rayCluster.Name) // Wait for RayCluster to become ready and verify the number of available worker replicas. - test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). Should(WithTransform(RayClusterState, Equal(rayv1.Ready))) - rayCluster = GetRayCluster(test, rayCluster.Namespace, rayCluster.Name) - test.Expect(rayCluster.Status.DesiredWorkerReplicas).To(Equal(int32(0))) + g.Expect(GetRayCluster(test, rayCluster.Namespace, rayCluster.Name)).To(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(0)))) - headPod := GetHeadPod(test, rayCluster) + headPod, err := GetHeadPod(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Found head pod %s/%s", headPod.Namespace, headPod.Name) // Create a detached actor, and a worker should be created. ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/create_detached_actor.py", "actor1"}) - test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). Should(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(1)))) // Create a detached actor, and a worker should be created. ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/create_detached_actor.py", "actor2"}) - test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). Should(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(2)))) // Terminate a detached actor, and a worker should be deleted. ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/terminate_detached_actor.py", "actor1"}) - test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). Should(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(1)))) // Terminate a detached actor, and a worker should be deleted. ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/terminate_detached_actor.py", "actor2"}) - test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). Should(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(0)))) }) } func TestRayClusterAutoscalerWithFakeGPU(t *testing.T) { test := With(t) + g := NewWithT(t) // Create a namespace namespace := test.NewTestNamespace() @@ -86,7 +88,7 @@ func TestRayClusterAutoscalerWithFakeGPU(t *testing.T) { // Scripts for creating and terminating detached actors to trigger autoscaling scriptsAC := newConfigMap(namespace.Name, "scripts", files(test, "create_detached_actor.py", "terminate_detached_actor.py")) scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name) test.T().Run("Create a RayCluster with autoscaling enabled", func(_ *testing.T) { @@ -107,35 +109,36 @@ func TestRayClusterAutoscalerWithFakeGPU(t *testing.T) { WithSpec(apply(rayClusterSpecAC, mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](scripts, "/home/ray/test_scripts"))) rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayCluster %s/%s successfully", rayCluster.Namespace, rayCluster.Name) // Wait for RayCluster to become ready and verify the number of available worker replicas. - test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). Should(WithTransform(RayClusterState, Equal(rayv1.Ready))) - rayCluster = GetRayCluster(test, rayCluster.Namespace, rayCluster.Name) - test.Expect(rayCluster.Status.DesiredWorkerReplicas).To(Equal(int32(0))) + g.Expect(GetRayCluster(test, rayCluster.Namespace, rayCluster.Name)).To(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(0)))) - headPod := GetHeadPod(test, rayCluster) + headPod, err := GetHeadPod(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Found head pod %s/%s", headPod.Namespace, headPod.Name) // Create a detached gpu actor, and a worker in the "gpu-group" should be created. ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/create_detached_actor.py", "gpu_actor", "--num-gpus=1"}) - test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). Should(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(1)))) // We don't use real GPU resources of Kubernetes here, therefore we can't test the RayClusterDesiredGPU. // We test the Pods count of the "gpu-group" instead. - test.Expect(GetGroupPods(test, rayCluster, "gpu-group")).To(HaveLen(1)) + g.Expect(GetGroupPods(test, rayCluster, "gpu-group")).To(HaveLen(1)) // Terminate the gpu detached actor, and the worker should be deleted. ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/terminate_detached_actor.py", "gpu_actor"}) - test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). Should(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(0)))) }) } func TestRayClusterAutoscalerWithCustomResource(t *testing.T) { test := With(t) + g := NewWithT(t) // Create a namespace namespace := test.NewTestNamespace() @@ -144,7 +147,7 @@ func TestRayClusterAutoscalerWithCustomResource(t *testing.T) { // Scripts for creating and terminating detached actors to trigger autoscaling scriptsAC := newConfigMap(namespace.Name, "scripts", files(test, "create_detached_actor.py", "terminate_detached_actor.py")) scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name) test.T().Run("Create a RayCluster with autoscaling enabled", func(_ *testing.T) { @@ -167,27 +170,27 @@ func TestRayClusterAutoscalerWithCustomResource(t *testing.T) { WithSpec(apply(rayClusterSpecAC, mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](scripts, "/home/ray/test_scripts"))) rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) - test.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created RayCluster %s/%s successfully", rayCluster.Namespace, rayCluster.Name) // Wait for RayCluster to become ready and verify the number of available worker replicas. - test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). Should(WithTransform(RayClusterState, Equal(rayv1.Ready))) - rayCluster = GetRayCluster(test, rayCluster.Namespace, rayCluster.Name) - test.Expect(rayCluster.Status.DesiredWorkerReplicas).To(Equal(int32(0))) + g.Expect(GetRayCluster(test, rayCluster.Namespace, rayCluster.Name)).To(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(0)))) - headPod := GetHeadPod(test, rayCluster) + headPod, err := GetHeadPod(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Found head pod %s/%s", headPod.Namespace, headPod.Name) // Create a detached custom resource actor, and a worker in the "custom-resource-group" should be created. ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/create_detached_actor.py", "custom_resource_actor", "--num-custom-resources=1"}) - test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). Should(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(1)))) - test.Expect(GetGroupPods(test, rayCluster, groupName)).To(HaveLen(1)) + g.Expect(GetGroupPods(test, rayCluster, groupName)).To(HaveLen(1)) // Terminate the custom resource detached actor, and the worker should be deleted. ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/terminate_detached_actor.py", "custom_resource_actor"}) - test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). Should(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(0)))) }) } diff --git a/ray-operator/test/e2eautoscaler/support.go b/ray-operator/test/e2eautoscaler/support.go index 48fae41b54..8763a06f4e 100644 --- a/ray-operator/test/e2eautoscaler/support.go +++ b/ray-operator/test/e2eautoscaler/support.go @@ -3,7 +3,7 @@ package e2eautoscaler import ( "embed" - "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -19,7 +19,7 @@ var _files embed.FS func ReadFile(t Test, fileName string) []byte { t.T().Helper() file, err := _files.ReadFile(fileName) - t.Expect(err).NotTo(gomega.HaveOccurred()) + assert.NoError(t.T(), err) return file } diff --git a/ray-operator/test/sampleyaml/raycluster_test.go b/ray-operator/test/sampleyaml/raycluster_test.go index ccdd754d32..f652501f6a 100644 --- a/ray-operator/test/sampleyaml/raycluster_test.go +++ b/ray-operator/test/sampleyaml/raycluster_test.go @@ -3,7 +3,7 @@ package sampleyaml import ( "testing" - "github.com/onsi/gomega" + . "github.com/onsi/gomega" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" . "github.com/ray-project/kuberay/ray-operator/test/support" @@ -60,18 +60,22 @@ func TestRayCluster(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { test := With(t) + g := NewWithT(t) + namespace := test.NewTestNamespace() test.StreamKubeRayOperatorLogs() rayClusterFromYaml := DeserializeRayClusterSampleYAML(test, tt.name) KubectlApplyYAML(test, tt.name, namespace.Name) - rayCluster := GetRayCluster(test, namespace.Name, rayClusterFromYaml.Name) - test.Expect(rayCluster).NotTo(gomega.BeNil()) + rayCluster, err := GetRayCluster(test, namespace.Name, rayClusterFromYaml.Name) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(rayCluster).NotTo(BeNil()) test.T().Logf("Waiting for RayCluster %s/%s to be ready", namespace.Name, rayCluster.Name) - test.Eventually(RayCluster(test, namespace.Name, rayCluster.Name), TestTimeoutMedium). - Should(gomega.WithTransform(RayClusterState, gomega.Equal(rayv1.Ready))) - rayCluster = GetRayCluster(test, namespace.Name, rayCluster.Name) + g.Eventually(RayCluster(test, namespace.Name, rayCluster.Name), TestTimeoutMedium). + Should(WithTransform(RayClusterState, Equal(rayv1.Ready))) + rayCluster, err = GetRayCluster(test, namespace.Name, rayCluster.Name) + g.Expect(err).NotTo(HaveOccurred()) // Check if the RayCluster created correct number of pods var desiredWorkerReplicas int32 @@ -80,17 +84,17 @@ func TestRayCluster(t *testing.T) { desiredWorkerReplicas += *workerGroupSpec.Replicas } } - test.Eventually(GetWorkerPods(test, rayCluster), TestTimeoutShort).Should(gomega.HaveLen(int(desiredWorkerReplicas))) - test.Expect(rayCluster.Status.DesiredWorkerReplicas).To(gomega.Equal(desiredWorkerReplicas)) + g.Eventually(WorkerPods(test, rayCluster), TestTimeoutShort).Should(HaveLen(int(desiredWorkerReplicas))) + g.Expect(rayCluster.Status.DesiredWorkerReplicas).To(Equal(desiredWorkerReplicas)) // Check if the head pod is ready - test.Eventually(GetHeadPod(test, rayCluster), TestTimeoutShort).Should(gomega.WithTransform(IsPodRunningAndReady, gomega.BeTrue())) + g.Eventually(HeadPod(test, rayCluster), TestTimeoutShort).Should(WithTransform(IsPodRunningAndReady, BeTrue())) // Check if all worker pods are ready - test.Eventually(GetWorkerPods(test, rayCluster), TestTimeoutShort).Should(gomega.WithTransform(AllPodsRunningAndReady, gomega.BeTrue())) + g.Eventually(WorkerPods(test, rayCluster), TestTimeoutShort).Should(WithTransform(AllPodsRunningAndReady, BeTrue())) // Check that all pods can submit jobs - test.Eventually(SubmitJobsToAllPods(test, GetAllPods(test, rayCluster)), TestTimeoutShort).Should(gomega.Succeed()) + g.Eventually(SubmitJobsToAllPods(test, rayCluster), TestTimeoutShort).Should(Succeed()) }) } } diff --git a/ray-operator/test/sampleyaml/support.go b/ray-operator/test/sampleyaml/support.go index da878d0021..1b26546820 100644 --- a/ray-operator/test/sampleyaml/support.go +++ b/ray-operator/test/sampleyaml/support.go @@ -6,7 +6,8 @@ import ( "path/filepath" "runtime" - "github.com/onsi/gomega" + . "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" @@ -19,8 +20,8 @@ func getSampleYAMLDir(t Test) string { _, b, _, _ := runtime.Caller(0) sampleYAMLDir := filepath.Join(filepath.Dir(b), "../../config/samples") info, err := os.Stat(sampleYAMLDir) - t.Expect(err).NotTo(gomega.HaveOccurred()) - t.Expect(info.IsDir()).To(gomega.BeTrue()) + assert.NoError(t.T(), err) + assert.True(t.T(), info.IsDir()) return sampleYAMLDir } @@ -29,7 +30,7 @@ func readYAML(t Test, filename string) []byte { sampleYAMLDir := getSampleYAMLDir(t) yamlFile := filepath.Join(sampleYAMLDir, filename) yamlFileContent, err := os.ReadFile(yamlFile) - t.Expect(err).NotTo(gomega.HaveOccurred()) + assert.NoError(t.T(), err) return yamlFileContent } @@ -39,7 +40,7 @@ func DeserializeRayClusterSampleYAML(t Test, filename string) *rayv1.RayCluster decoder := rayscheme.Codecs.UniversalDecoder() rayCluster := &rayv1.RayCluster{} _, _, err := decoder.Decode(yamlFileContent, nil, rayCluster) - t.Expect(err).NotTo(gomega.HaveOccurred()) + assert.NoError(t.T(), err) return rayCluster } @@ -49,7 +50,7 @@ func KubectlApplyYAML(t Test, filename string, namespace string) { sampleYAMLPath := filepath.Join(sampleYAMLDir, filename) kubectlCmd := exec.CommandContext(t.Ctx(), "kubectl", "apply", "-f", sampleYAMLPath, "-n", namespace) err := kubectlCmd.Run() - t.Expect(err).NotTo(gomega.HaveOccurred()) + assert.NoError(t.T(), err) t.T().Logf("Successfully applied %s", filename) } @@ -74,8 +75,10 @@ func AllPodsRunningAndReady(pods []corev1.Pod) bool { return true } -func SubmitJobsToAllPods(t Test, pods []corev1.Pod) func(gomega.Gomega) { - return func(gomega.Gomega) { +func SubmitJobsToAllPods(t Test, rayCluster *rayv1.RayCluster) func(Gomega) { + return func(g Gomega) { + pods, err := GetAllPods(t, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) cmd := []string{ "python", "-c", diff --git a/ray-operator/test/support/batch.go b/ray-operator/test/support/batch.go index cd86d0a965..acbef60882 100644 --- a/ray-operator/test/support/batch.go +++ b/ray-operator/test/support/batch.go @@ -22,8 +22,3 @@ func Job(t Test, namespace, name string) func(g gomega.Gomega) *batchv1.Job { return job } } - -func GetJob(t Test, namespace, name string) *batchv1.Job { - t.T().Helper() - return Job(t, namespace, name)(t) -} diff --git a/ray-operator/test/support/core.go b/ray-operator/test/support/core.go index b159a0710e..754a67a26b 100644 --- a/ray-operator/test/support/core.go +++ b/ray-operator/test/support/core.go @@ -4,6 +4,8 @@ import ( "io" "os/exec" + "github.com/stretchr/testify/assert" + "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" @@ -15,7 +17,7 @@ func Pods(t Test, namespace string, options ...Option[*metav1.ListOptions]) func listOptions := &metav1.ListOptions{} for _, option := range options { - t.Expect(option.applyTo(listOptions)).To(gomega.Succeed()) + g.Expect(option.applyTo(listOptions)).To(gomega.Succeed()) } pods, err := t.Client().Core().CoreV1().Pods(namespace).List(t.Ctx(), *listOptions) @@ -28,7 +30,7 @@ func storeAllPodLogs(t Test, namespace *corev1.Namespace) { t.T().Helper() pods, err := t.Client().Core().CoreV1().Pods(namespace.Name).List(t.Ctx(), metav1.ListOptions{}) - t.Expect(err).NotTo(gomega.HaveOccurred()) + assert.NoError(t.T(), err) for _, pod := range pods.Items { for _, container := range pod.Spec.Containers { @@ -47,14 +49,14 @@ func storeContainerLog(t Test, namespace *corev1.Namespace, podName, containerNa t.T().Logf("Error getting logs from container %s/%s/%s", namespace.Name, podName, containerName) return } - t.Expect(err).NotTo(gomega.HaveOccurred()) + assert.NoError(t.T(), err) defer func() { - t.Expect(stream.Close()).To(gomega.Succeed()) + assert.NoError(t.T(), stream.Close()) }() bytes, err := io.ReadAll(stream) - t.Expect(err).NotTo(gomega.HaveOccurred()) + assert.NoError(t.T(), err) containerLogFileName := "pod-" + podName + "-" + containerName WriteToOutputDir(t, containerLogFileName, Log, bytes) @@ -67,5 +69,5 @@ func ExecPodCmd(t Test, pod *corev1.Pod, containerName string, cmd []string) { t.T().Logf("Executing command: kubectl %s", kubectlCmd) output, err := exec.Command("kubectl", kubectlCmd...).CombinedOutput() t.T().Logf("Command output: %s", output) - t.Expect(err).NotTo(gomega.HaveOccurred()) + assert.NoError(t.T(), err) } diff --git a/ray-operator/test/support/events.go b/ray-operator/test/support/events.go index 24f3ab7898..c317fdb41e 100644 --- a/ray-operator/test/support/events.go +++ b/ray-operator/test/support/events.go @@ -4,7 +4,7 @@ import ( "bytes" "fmt" - "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" eventsv1 "k8s.io/api/events/v1" @@ -39,12 +39,12 @@ func storeEvents(t Test, namespace *corev1.Namespace) { t.T().Helper() events, err := t.Client().Core().EventsV1().Events(namespace.Name).List(t.Ctx(), metav1.ListOptions{}) - t.Expect(err).NotTo(gomega.HaveOccurred()) + assert.NoError(t.T(), err) - bytes, err := renderEventContent(eventKeys, mapEventsToKeys(events)) - t.Expect(err).NotTo(gomega.HaveOccurred()) + eventContent, err := renderEventContent(eventKeys, mapEventsToKeys(events)) + assert.NoError(t.T(), err) - WriteToOutputDir(t, eventLogFileName, Log, bytes) + WriteToOutputDir(t, eventLogFileName, Log, eventContent) } func mapEventsToKeys(eventList *eventsv1.EventList) []map[string]string { diff --git a/ray-operator/test/support/namespace.go b/ray-operator/test/support/namespace.go index 55c1f9b2f2..1e8741c6a7 100644 --- a/ray-operator/test/support/namespace.go +++ b/ray-operator/test/support/namespace.go @@ -1,7 +1,7 @@ package support import ( - "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -20,11 +20,11 @@ func createTestNamespace(t Test, options ...Option[*corev1.Namespace]) *corev1.N } for _, option := range options { - t.Expect(option.applyTo(namespace)).To(gomega.Succeed()) + assert.NoError(t.T(), option.applyTo(namespace)) } namespace, err := t.Client().Core().CoreV1().Namespaces().Create(t.Ctx(), namespace, metav1.CreateOptions{}) - t.Expect(err).NotTo(gomega.HaveOccurred()) + assert.NoError(t.T(), err) return namespace } @@ -35,5 +35,5 @@ func deleteTestNamespace(t Test, namespace *corev1.Namespace) { err := t.Client().Core().CoreV1().Namespaces().Delete(t.Ctx(), namespace.Name, metav1.DeleteOptions{ PropagationPolicy: &propagationPolicy, }) - t.Expect(err).NotTo(gomega.HaveOccurred()) + assert.NoError(t.T(), err) } diff --git a/ray-operator/test/support/ray.go b/ray-operator/test/support/ray.go index 03dffb515d..80ad29392b 100644 --- a/ray-operator/test/support/ray.go +++ b/ray-operator/test/support/ray.go @@ -1,7 +1,10 @@ package support import ( + "errors" + "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/common" @@ -10,17 +13,14 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func RayJob(t Test, namespace, name string) func(g gomega.Gomega) *rayv1.RayJob { - return func(g gomega.Gomega) *rayv1.RayJob { - job, err := t.Client().Ray().RayV1().RayJobs(namespace).Get(t.Ctx(), name, metav1.GetOptions{}) - g.Expect(err).NotTo(gomega.HaveOccurred()) - return job +func RayJob(t Test, namespace, name string) func() (*rayv1.RayJob, error) { + return func() (*rayv1.RayJob, error) { + return GetRayJob(t, namespace, name) } } -func GetRayJob(t Test, namespace, name string) *rayv1.RayJob { - t.T().Helper() - return RayJob(t, namespace, name)(t) +func GetRayJob(t Test, namespace, name string) (*rayv1.RayJob, error) { + return t.Client().Ray().RayV1().RayJobs(namespace).Get(t.Ctx(), name, metav1.GetOptions{}) } func RayJobStatus(job *rayv1.RayJob) rayv1.JobStatus { @@ -49,29 +49,14 @@ func RayJobSucceeded(job *rayv1.RayJob) int32 { return *job.Status.Succeeded } -func GetRayJobId(t Test, namespace, name string) string { - t.T().Helper() - job := RayJob(t, namespace, name)(t) - return job.Status.JobId -} - -func RayCluster(t Test, namespace, name string) func(g gomega.Gomega) *rayv1.RayCluster { - return func(g gomega.Gomega) *rayv1.RayCluster { - cluster, err := t.Client().Ray().RayV1().RayClusters(namespace).Get(t.Ctx(), name, metav1.GetOptions{}) - g.Expect(err).NotTo(gomega.HaveOccurred()) - return cluster +func RayCluster(t Test, namespace, name string) func() (*rayv1.RayCluster, error) { + return func() (*rayv1.RayCluster, error) { + return GetRayCluster(t, namespace, name) } } -func RayClusterOrError(t Test, namespace, name string) func(g gomega.Gomega) (*rayv1.RayCluster, error) { - return func(_ gomega.Gomega) (*rayv1.RayCluster, error) { - return t.Client().Ray().RayV1().RayClusters(namespace).Get(t.Ctx(), name, metav1.GetOptions{}) - } -} - -func GetRayCluster(t Test, namespace, name string) *rayv1.RayCluster { - t.T().Helper() - return RayCluster(t, namespace, name)(t) +func GetRayCluster(t Test, namespace, name string) (*rayv1.RayCluster, error) { + return t.Client().Ray().RayV1().RayClusters(namespace).Get(t.Ctx(), name, metav1.GetOptions{}) } func RayClusterState(cluster *rayv1.RayCluster) rayv1.ClusterState { @@ -82,35 +67,52 @@ func RayClusterDesiredWorkerReplicas(cluster *rayv1.RayCluster) int32 { return cluster.Status.DesiredWorkerReplicas } -func GetHeadPod(t Test, rayCluster *rayv1.RayCluster) *corev1.Pod { - t.T().Helper() +func HeadPod(t Test, rayCluster *rayv1.RayCluster) func() (*corev1.Pod, error) { + return func() (*corev1.Pod, error) { + return GetHeadPod(t, rayCluster) + } +} + +func GetHeadPod(t Test, rayCluster *rayv1.RayCluster) (*corev1.Pod, error) { pods, err := t.Client().Core().CoreV1().Pods(rayCluster.Namespace).List( t.Ctx(), common.RayClusterHeadPodsAssociationOptions(rayCluster).ToMetaV1ListOptions(), ) - t.Expect(err).NotTo(gomega.HaveOccurred()) - t.Expect(len(pods.Items)).To(gomega.Equal(1)) - return &pods.Items[0] + if err != nil { + return nil, err + } + if len(pods.Items) != 1 { + return nil, errors.New("number of head pods is not 1") + } + return &pods.Items[0], nil } -func GetWorkerPods(t Test, rayCluster *rayv1.RayCluster) []corev1.Pod { - t.T().Helper() +func WorkerPods(t Test, rayCluster *rayv1.RayCluster) func() ([]corev1.Pod, error) { + return func() ([]corev1.Pod, error) { + return GetWorkerPods(t, rayCluster) + } +} + +func GetWorkerPods(t Test, rayCluster *rayv1.RayCluster) ([]corev1.Pod, error) { pods, err := t.Client().Core().CoreV1().Pods(rayCluster.Namespace).List( t.Ctx(), common.RayClusterWorkerPodsAssociationOptions(rayCluster).ToMetaV1ListOptions(), ) - t.Expect(err).NotTo(gomega.HaveOccurred()) - return pods.Items + if pods == nil { + return nil, err + } + return pods.Items, err } -func GetAllPods(t Test, rayCluster *rayv1.RayCluster) []corev1.Pod { - t.T().Helper() +func GetAllPods(t Test, rayCluster *rayv1.RayCluster) ([]corev1.Pod, error) { pods, err := t.Client().Core().CoreV1().Pods(rayCluster.Namespace).List( t.Ctx(), common.RayClusterAllPodsAssociationOptions(rayCluster).ToMetaV1ListOptions(), ) - t.Expect(err).NotTo(gomega.HaveOccurred()) - return pods.Items + if pods == nil { + return nil, err + } + return pods.Items, err } func GetGroupPods(t Test, rayCluster *rayv1.RayCluster, group string) []corev1.Pod { @@ -119,7 +121,7 @@ func GetGroupPods(t Test, rayCluster *rayv1.RayCluster, group string) []corev1.P t.Ctx(), common.RayClusterGroupPodsAssociationOptions(rayCluster, group).ToMetaV1ListOptions(), ) - t.Expect(err).NotTo(gomega.HaveOccurred()) + assert.NoError(t.T(), err) return pods.Items } diff --git a/ray-operator/test/support/support.go b/ray-operator/test/support/support.go index b0715375ac..c8fc9afb92 100644 --- a/ray-operator/test/support/support.go +++ b/ray-operator/test/support/support.go @@ -7,13 +7,11 @@ import ( "github.com/onsi/gomega" "github.com/onsi/gomega/format" - k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) var ( - TestApplyOptions = metav1.ApplyOptions{FieldManager: "kuberay-test", Force: true} - TestCreateOptions = metav1.CreateOptions{FieldManager: "kuberay-test"} + TestApplyOptions = metav1.ApplyOptions{FieldManager: "kuberay-test", Force: true} TestTimeoutShort = 1 * time.Minute TestTimeoutMedium = 2 * time.Minute @@ -51,10 +49,3 @@ func init() { // Disable object truncation on test results format.MaxLength = 0 } - -func NotFound[T any](fn func(g gomega.Gomega) (T, error)) func(g gomega.Gomega) bool { - return func(g gomega.Gomega) bool { - _, err := fn(g) - return k8serrors.IsNotFound(err) - } -} diff --git a/ray-operator/test/support/test.go b/ray-operator/test/support/test.go index 23856970b7..a6181e07ef 100644 --- a/ray-operator/test/support/test.go +++ b/ray-operator/test/support/test.go @@ -9,7 +9,8 @@ import ( "testing" "time" - "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" corev1 "k8s.io/api/core/v1" @@ -21,8 +22,6 @@ type Test interface { Client() Client OutputDir() string - gomega.Gomega - NewTestNamespace(...Option[*corev1.Namespace]) *corev1.Namespace StreamKubeRayOperatorLogs() } @@ -49,14 +48,12 @@ func With(t *testing.T) Test { } return &T{ - WithT: gomega.NewWithT(t), - t: t, - ctx: ctx, + t: t, + ctx: ctx, } } type T struct { - *gomega.WithT t *testing.T //nolint:containedctx //nolint:nolintlint // TODO: The reason for this lint is unknown ctx context.Context @@ -132,7 +129,7 @@ func (t *T) StreamKubeRayOperatorLogs() { pods, err := t.Client().Core().CoreV1().Pods("").List(ctx, metav1.ListOptions{ LabelSelector: "app.kubernetes.io/component=kuberay-operator", }) - t.Expect(err).ShouldNot(gomega.HaveOccurred()) + assert.NoError(t.T(), err) now := metav1.NewTime(time.Now()) for _, pod := range pods.Items { go func(pod corev1.Pod, ts *metav1.Time) { diff --git a/ray-operator/test/support/utils.go b/ray-operator/test/support/utils.go index ac39238d61..b24a02ff29 100644 --- a/ray-operator/test/support/utils.go +++ b/ray-operator/test/support/utils.go @@ -5,7 +5,7 @@ import ( "os" "path" - "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" ) func Ptr[T any](v T) *T { @@ -20,6 +20,6 @@ const ( func WriteToOutputDir(t Test, fileName string, fileType OutputType, data []byte) { t.T().Helper() - t.Expect(os.WriteFile(path.Join(t.OutputDir(), fileName+"."+string(fileType)), data, fs.ModePerm)). - To(gomega.Succeed()) + err := os.WriteFile(path.Join(t.OutputDir(), fileName+"."+string(fileType)), data, fs.ModePerm) + assert.NoError(t.T(), err) }