diff --git a/test/e2e/framework/helpers/k8s/k8s_utils.go b/test/e2e/framework/helpers/k8s/k8s_utils.go index 742a580bb..2890be6aa 100644 --- a/test/e2e/framework/helpers/k8s/k8s_utils.go +++ b/test/e2e/framework/helpers/k8s/k8s_utils.go @@ -816,6 +816,25 @@ func (k *KubeCtl) WaitForPodBySelectorRunning(namespace string, selector string, return nil } +// Wait for all pods in 'namespace' with given 'selector' to enter succeeded state. +// Returns an error if no pods are found or not all discovered pods enter succeeded state. +func (k *KubeCtl) WaitForPodBySelectorSucceeded(namespace string, selector string, timeout time.Duration) error { + podList, err := k.ListPods(namespace, selector) + if err != nil { + return err + } + if len(podList.Items) == 0 { + return fmt.Errorf("no pods in %s with selector %s", namespace, selector) + } + + for _, pod := range podList.Items { + if err := k.WaitForPodSucceeded(namespace, pod.Name, timeout); err != nil { + return err + } + } + return nil +} + // Wait up to timeout seconds for a pod in 'namespace' with given 'selector' to exist func (k *KubeCtl) WaitForPodBySelector(namespace string, selector string, timeout time.Duration) error { return wait.PollUntilContextTimeout(context.TODO(), time.Millisecond*100, timeout, false, k.isPodSelectorInNs(selector, namespace).WithContext()) diff --git a/test/e2e/spark_jobs_scheduling/spark_jobs_scheduling_test.go b/test/e2e/spark_jobs_scheduling/spark_jobs_scheduling_test.go index 375e9f9af..a28e14dba 100644 --- a/test/e2e/spark_jobs_scheduling/spark_jobs_scheduling_test.go +++ b/test/e2e/spark_jobs_scheduling/spark_jobs_scheduling_test.go @@ -144,8 +144,12 @@ var _ = Describe("", func() { // Verify that all the spark jobs are scheduled and are in running state. for _, id := range appIds { - By(fmt.Sprintf("Verify if app: %s is in running state", id)) - err = restClient.WaitForAppStateTransition("default", "root."+sparkNS, id, yunikorn.States().Application.Running, 360) + By(fmt.Sprintf("Verify driver pod for application %s has been created.", id)) + err = kClient.WaitForPodBySelector(sparkNS, fmt.Sprintf("spark-app-selector=%s, spark-role=driver", id), 180*time.Second) + Ω(err).ShouldNot(HaveOccurred()) + + By(fmt.Sprintf("Verify driver pod for application %s was completed.", id)) + err = kClient.WaitForPodBySelectorSucceeded(sparkNS, fmt.Sprintf("spark-app-selector=%s, spark-role=driver", id), 360*time.Second) Ω(err).NotTo(HaveOccurred()) } })