From 65ed743954fdccb861cbc42fe74e9b2389d54964 Mon Sep 17 00:00:00 2001 From: Srihari Date: Thu, 11 Jan 2024 19:56:15 +0530 Subject: [PATCH] Create a Ray Cluster SDK upgrade scenarios --- go.mod | 2 +- go.sum | 4 +- tests/e2e/mnist_raycluster_sdk_test.go | 40 +- tests/e2e/mnist_rayjob.py | 46 +++ tests/e2e/start_ray_cluster.py | 52 +++ tests/e2e/support.go | 50 ++- tests/upgrade/raycluster_sdk_upgrade_test.go | 396 +++++++++++++++++++ 7 files changed, 548 insertions(+), 42 deletions(-) create mode 100644 tests/e2e/mnist_rayjob.py create mode 100644 tests/e2e/start_ray_cluster.py create mode 100644 tests/upgrade/raycluster_sdk_upgrade_test.go diff --git a/go.mod b/go.mod index 08f9e6514..436ed65c6 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.20 require ( github.com/onsi/gomega v1.27.10 - github.com/project-codeflare/codeflare-common v0.0.0-20231110155354-042fb171fcdb + github.com/project-codeflare/codeflare-common v0.0.0-20231129165224-988ba1da9069 github.com/project-codeflare/multi-cluster-app-dispatcher v1.37.0 github.com/ray-project/kuberay/ray-operator v1.0.0 k8s.io/api v0.26.3 diff --git a/go.sum b/go.sum index d85b8eb71..0d6034b87 100644 --- a/go.sum +++ b/go.sum @@ -369,8 +369,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/project-codeflare/codeflare-common v0.0.0-20231110155354-042fb171fcdb h1:L2Gdr2SlvshDKZY2KK6507AwzQ1NSfRbMQuz5dOsYNM= -github.com/project-codeflare/codeflare-common v0.0.0-20231110155354-042fb171fcdb/go.mod h1:zdi2GCYJX+QyxFWyCLMoTme3NMz/aucWDJWMqKfigxk= +github.com/project-codeflare/codeflare-common v0.0.0-20231129165224-988ba1da9069 h1:81+ma1mchF/LtAGsf+poAt50kJ/fLYjoTAcZOxci1Yc= +github.com/project-codeflare/codeflare-common v0.0.0-20231129165224-988ba1da9069/go.mod h1:zdi2GCYJX+QyxFWyCLMoTme3NMz/aucWDJWMqKfigxk= github.com/project-codeflare/multi-cluster-app-dispatcher v1.37.0 h1:oyhdLdc4BgA4zcH1zlRrSrYpzuVxV5QLDbyIXrwnQqs= github.com/project-codeflare/multi-cluster-app-dispatcher v1.37.0/go.mod h1:Yge6GRNpO9YIDfeL+XOcCE9xbmfCTD5C1h5dlW87mxQ= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= diff --git a/tests/e2e/mnist_raycluster_sdk_test.go b/tests/e2e/mnist_raycluster_sdk_test.go index 9d4dc862f..963a48722 100644 --- a/tests/e2e/mnist_raycluster_sdk_test.go +++ b/tests/e2e/mnist_raycluster_sdk_test.go @@ -17,9 +17,7 @@ limitations under the License. package e2e import ( - "strings" "testing" - "time" . "github.com/onsi/gomega" . "github.com/project-codeflare/codeflare-common/support" @@ -137,7 +135,7 @@ func TestMNISTRayClusterSDK(t *testing.T) { Command: []string{ "/bin/sh", "-c", "while [ ! -f /codeflare-sdk/pyproject.toml ]; do sleep 1; done; " + - "cp /test/* . && chmod +x install-codeflare-sdk.sh && ./install-codeflare-sdk.sh && python mnist_raycluster_sdk.py " + namespace.Name, + "cp /test/* . && chmod +x install-codeflare-sdk.sh && ./install-codeflare-sdk.sh && python mnist_raycluster_sdk.py " + namespace.Name, }, VolumeMounts: []corev1.VolumeMount{ { @@ -194,40 +192,8 @@ func TestMNISTRayClusterSDK(t *testing.T) { test.Expect(err).NotTo(HaveOccurred()) test.T().Logf("Created Job %s/%s successfully", job.Namespace, job.Name) - go func() { - // Checking if pod is found and running - podName := "" - foundPod := false - for !foundPod { - pods, _ := test.Client().Core().CoreV1().Pods(namespace.Name).List(test.Ctx(), metav1.ListOptions{ - LabelSelector: "job-name=sdk", - }) - for _, pod := range pods.Items { - if strings.HasPrefix(pod.Name, "sdk-") && pod.Status.Phase == corev1.PodRunning { - podName = pod.Name - foundPod = true - test.T().Logf("Pod is running!") - break - } - } - if !foundPod { - test.T().Logf("Waiting for pod to start...") - time.Sleep(5 * time.Second) - } - } - - // Get rest config - restConfig, err := GetRestConfig(test); if err != nil { - test.T().Errorf("Error getting rest config: %v", err) - } - - // Copy codeflare-sdk to the pod - srcDir := "../.././" - dstDir := "/codeflare-sdk" - if err := CopyToPod(test, namespace.Name, podName, restConfig, srcDir, dstDir); err != nil { - test.T().Errorf("Error copying codeflare-sdk to pod: %v", err) - } - }() + // Setup the codeflare-sdk inside the pod associated to the created job + SetupCodeflareSDKInsidePod(test, namespace, job.Name) test.T().Logf("Waiting for Job %s/%s to complete", job.Namespace, job.Name) test.Eventually(Job(test, job.Namespace, job.Name), TestTimeoutLong).Should( diff --git a/tests/e2e/mnist_rayjob.py b/tests/e2e/mnist_rayjob.py new file mode 100644 index 000000000..8557a55c1 --- /dev/null +++ b/tests/e2e/mnist_rayjob.py @@ -0,0 +1,46 @@ +import sys + +from time import sleep + +from torchx.specs.api import AppState, is_terminal + +from codeflare_sdk.cluster.cluster import get_cluster +from codeflare_sdk.job.jobs import DDPJobDefinition + +namespace = sys.argv[1] + +cluster = get_cluster("mnist", namespace) + +cluster.details() + +jobdef = DDPJobDefinition( + name="mnist", + script="mnist.py", + scheduler_args={"requirements": "requirements.txt"}, +) +job = jobdef.submit(cluster) + +done = False +time = 0 +timeout = 900 +while not done: + status = job.status() + if is_terminal(status.state): + break + if not done: + print(status) + if timeout and time >= timeout: + raise TimeoutError(f"job has timed out after waiting {timeout}s") + sleep(5) + time += 5 + +print(f"Job has completed: {status.state}") + +print(job.logs()) + +cluster.down() + +if not status.state == AppState.SUCCEEDED: + exit(1) +else: + exit(0) diff --git a/tests/e2e/start_ray_cluster.py b/tests/e2e/start_ray_cluster.py new file mode 100644 index 000000000..774be8f04 --- /dev/null +++ b/tests/e2e/start_ray_cluster.py @@ -0,0 +1,52 @@ +import sys +import os + +from time import sleep + +from codeflare_sdk.cluster.cluster import Cluster, ClusterConfiguration + +namespace = sys.argv[1] +ray_image = os.getenv("RAY_IMAGE") +host = os.getenv("CLUSTER_HOSTNAME") + +ingress_options = {} +if host is not None: + ingress_options = { + "ingresses": [ + { + "ingressName": "ray-dashboard", + "port": 8265, + "pathType": "Prefix", + "path": "/", + "host": host, + }, + ] + } + +cluster = Cluster( + ClusterConfiguration( + name="mnist", + namespace=namespace, + num_workers=1, + head_cpus="500m", + head_memory=2, + min_cpus="500m", + max_cpus=1, + min_memory=1, + max_memory=2, + num_gpus=0, + instascale=False, + image=ray_image, + ingress_options=ingress_options, + ) +) + +cluster.up() + +cluster.status() + +cluster.wait_ready() + +cluster.status() + +cluster.details() diff --git a/tests/e2e/support.go b/tests/e2e/support.go index 9ef8e1769..c3940fe21 100644 --- a/tests/e2e/support.go +++ b/tests/e2e/support.go @@ -20,9 +20,14 @@ import ( "embed" "os" "path/filepath" + "strings" + "time" "github.com/onsi/gomega" + "github.com/project-codeflare/codeflare-common/support" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/cli-runtime/pkg/genericclioptions" @@ -33,8 +38,6 @@ import ( "k8s.io/kubectl/pkg/cmd/cp" "k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/scheme" - - "github.com/project-codeflare/codeflare-common/support" ) //go:embed *.py *.txt *.sh @@ -109,3 +112,46 @@ func (r restClientGetter) ToDiscoveryClient() (discovery.CachedDiscoveryInterfac func (r restClientGetter) ToRESTMapper() (meta.RESTMapper, error) { return nil, nil } + +func SetupCodeflareSDKInsidePod(test support.Test, namespace *corev1.Namespace, labelName string) { + + // Get pod name + podName := GetPodName(test, namespace, labelName) + + // Get rest config + restConfig, err := GetRestConfig(test) + if err != nil { + test.T().Errorf("Error getting rest config: %v", err) + } + + // Copy codeflare-sdk to the pod + srcDir := "../.././" + dstDir := "/codeflare-sdk" + if err := CopyToPod(test, namespace.Name, podName, restConfig, srcDir, dstDir); err != nil { + test.T().Errorf("Error copying codeflare-sdk to pod: %v", err) + } +} + +func GetPodName(test support.Test, namespace *corev1.Namespace, labelName string) string { + podName := "" + foundPod := false + for !foundPod { + pods, _ := test.Client().Core().CoreV1().Pods(namespace.Name).List(test.Ctx(), metav1.ListOptions{ + LabelSelector: "job-name=" + labelName, + }) + for _, pod := range pods.Items { + + if strings.HasPrefix(pod.Name, labelName+"-") && pod.Status.Phase == corev1.PodRunning { + podName = pod.Name + foundPod = true + test.T().Logf("Pod is running!") + break + } + } + if !foundPod { + test.T().Logf("Waiting for pod to start...") + time.Sleep(5 * time.Second) + } + } + return podName +} diff --git a/tests/upgrade/raycluster_sdk_upgrade_test.go b/tests/upgrade/raycluster_sdk_upgrade_test.go new file mode 100644 index 000000000..6b3c93cf0 --- /dev/null +++ b/tests/upgrade/raycluster_sdk_upgrade_test.go @@ -0,0 +1,396 @@ +/* +Copyright 2023. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package upgrade + +import ( + "fmt" + "testing" + + . "github.com/onsi/gomega" + . "github.com/project-codeflare/codeflare-common/support" + mcadv1beta1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + . "github.com/project-codeflare/codeflare-sdk/tests/e2e" +) + +var ( + nsName = "test-ns-rayclusterupgrade" +) + +// Creates a Ray cluster +func TestMNISTRayClusterUp(t *testing.T) { + + test := With(t) + + // Create a namespace + namespace := CreateTestNamespaceWithName(test, nsName) + test.T().Logf("Created namespace %s successfully", namespace.Name) + + // Delete namespace only if test failed + defer func() { + if t.Failed() { + DeleteTestNamespace(test, namespace) + } else { + StoreNamespaceLogs(test, namespace) + } + }() + + // Test configuration + config := CreateConfigMap(test, namespace.Name, map[string][]byte{ + // SDK script + "start_ray_cluster.py": ReadFile(test, "start_ray_cluster.py"), + // codeflare-sdk installation script + "install-codeflare-sdk.sh": ReadFile(test, "install-codeflare-sdk.sh"), + }) + + // Create RBAC, retrieve token for user with limited rights + policyRules := []rbacv1.PolicyRule{ + { + Verbs: []string{"get", "create", "delete", "list", "patch", "update"}, + APIGroups: []string{mcadv1beta1.GroupName}, + Resources: []string{"appwrappers"}, + }, + { + Verbs: []string{"get", "list"}, + APIGroups: []string{rayv1.GroupVersion.Group}, + Resources: []string{"rayclusters", "rayclusters/status"}, + }, + { + Verbs: []string{"get", "list"}, + APIGroups: []string{"route.openshift.io"}, + Resources: []string{"routes"}, + }, + { + Verbs: []string{"get", "list"}, + APIGroups: []string{"networking.k8s.io"}, + Resources: []string{"ingresses"}, + }, + } + + sa := CreateServiceAccount(test, namespace.Name) + role := CreateRole(test, namespace.Name, policyRules) + CreateRoleBinding(test, namespace.Name, sa, role) + + job := &batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + APIVersion: batchv1.SchemeGroupVersion.String(), + Kind: "Job", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "sdk", + Namespace: namespace.Name, + }, + Spec: batchv1.JobSpec{ + Completions: Ptr(int32(1)), + Parallelism: Ptr(int32(1)), + BackoffLimit: Ptr(int32(0)), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Volumes: []corev1.Volume{ + { + Name: "test", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: config.Name, + }, + }, + }, + }, + { + Name: "codeflare-sdk", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + { + Name: "workdir", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + Containers: []corev1.Container{ + { + Name: "test", + // FIXME: switch to base Python image once the dependency on OpenShift CLI is removed + // See https://github.com/project-codeflare/codeflare-sdk/pull/146 + Image: "quay.io/opendatahub/notebooks:jupyter-minimal-ubi8-python-3.8-4c8f26e", + Env: []corev1.EnvVar{ + {Name: "PYTHONUSERBASE", Value: "/workdir"}, + {Name: "RAY_IMAGE", Value: GetRayImage()}, + }, + Command: []string{ + "/bin/sh", "-c", + "while [ ! -f /codeflare-sdk/pyproject.toml ]; do sleep 1; done; " + + "cp /test/* . && chmod +x install-codeflare-sdk.sh && ./install-codeflare-sdk.sh && python start_ray_cluster.py " + namespace.Name, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "test", + MountPath: "/test", + }, + { + Name: "codeflare-sdk", + MountPath: "/codeflare-sdk", + }, + { + Name: "workdir", + MountPath: "/workdir", + }, + }, + WorkingDir: "/workdir", + SecurityContext: &corev1.SecurityContext{ + AllowPrivilegeEscalation: Ptr(false), + SeccompProfile: &corev1.SeccompProfile{ + Type: "RuntimeDefault", + }, + Capabilities: &corev1.Capabilities{ + Drop: []corev1.Capability{"ALL"}, + }, + RunAsNonRoot: Ptr(true), + }, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + ServiceAccountName: sa.Name, + }, + }, + }, + } + if GetClusterType(test) == KindCluster { + // Take first KinD node and redirect pod hostname requests there + node := GetNodes(test)[0] + hostname := GetClusterHostname(test) + IP := GetNodeInternalIP(test, node) + + test.T().Logf("Setting KinD cluster hostname '%s' to node IP '%s' for SDK pod", hostname, IP) + job.Spec.Template.Spec.HostAliases = []corev1.HostAlias{ + { + IP: IP, + Hostnames: []string{hostname}, + }, + } + + // Propagate hostname into Python code as env variable + hostnameEnvVar := corev1.EnvVar{Name: "CLUSTER_HOSTNAME", Value: hostname} + job.Spec.Template.Spec.Containers[0].Env = append(job.Spec.Template.Spec.Containers[0].Env, hostnameEnvVar) + fmt.Printf("CLUSTER_HOSTNAME environment variable value: %s\n", hostname) + test.T().Logf("CLUSTER_HOSTNAME environment variable value: %s", hostname) + } + + job, err := test.Client().Core().BatchV1().Jobs(namespace.Name).Create(test.Ctx(), job, metav1.CreateOptions{}) + test.Expect(err).NotTo(HaveOccurred()) + test.T().Logf("Created Job %s/%s successfully", job.Namespace, job.Name) + + // Setup the codeflare-sdk inside the pod associated to the created job + SetupCodeflareSDKInsidePod(test, namespace, job.Name) + + test.T().Logf("Waiting for Job %s/%s to complete", job.Namespace, job.Name) + test.Eventually(Job(test, job.Namespace, job.Name), TestTimeoutLong).Should( + Or( + WithTransform(ConditionStatus(batchv1.JobComplete), Equal(corev1.ConditionTrue)), + WithTransform(ConditionStatus(batchv1.JobFailed), Equal(corev1.ConditionTrue)), + )) + + // Assert the job has completed successfully + test.Expect(GetJob(test, job.Namespace, job.Name)). + To(WithTransform(ConditionStatus(batchv1.JobComplete), Equal(corev1.ConditionTrue))) +} + +// Submit a Job to the Ray cluster and trains the MNIST dataset using the CodeFlare SDK. +func TestMnistJobSubmit(t *testing.T) { + + test := With(t) + + namespace := GetNamespaceWithName(test, nsName) + + //delete the namespace after test complete + defer DeleteTestNamespace(test, namespace) + + // Test configuration + config := CreateConfigMap(test, namespace.Name, map[string][]byte{ + // SDK script + "mnist_rayjob.py": ReadFile(test, "mnist_rayjob.py"), + // pip requirements + "requirements.txt": ReadFile(test, "mnist_pip_requirements.txt"), + // MNIST training script + "mnist.py": ReadFile(test, "mnist.py"), + // codeflare-sdk installation script + "install-codeflare-sdk.sh": ReadFile(test, "install-codeflare-sdk.sh"), + }) + + // Create RBAC, retrieve token for user with limited rights + policyRules := []rbacv1.PolicyRule{ + { + Verbs: []string{"get", "create", "delete", "list", "patch", "update"}, + APIGroups: []string{mcadv1beta1.GroupName}, + Resources: []string{"appwrappers"}, + }, + { + Verbs: []string{"get", "list"}, + APIGroups: []string{rayv1.GroupVersion.Group}, + Resources: []string{"rayclusters", "rayclusters/status"}, + }, + { + Verbs: []string{"get", "list"}, + APIGroups: []string{"route.openshift.io"}, + Resources: []string{"routes"}, + }, + { + Verbs: []string{"get", "list"}, + APIGroups: []string{"networking.k8s.io"}, + Resources: []string{"ingresses"}, + }, + } + + serviceAccount := CreateServiceAccount(test, namespace.Name) + role := CreateRole(test, namespace.Name, policyRules) + CreateRoleBinding(test, namespace.Name, serviceAccount, role) + + job := &batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + APIVersion: batchv1.SchemeGroupVersion.String(), + Kind: "Job", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "rayjob", + Namespace: namespace.Name, + }, + Spec: batchv1.JobSpec{ + Completions: Ptr(int32(1)), + Parallelism: Ptr(int32(1)), + BackoffLimit: Ptr(int32(0)), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Volumes: []corev1.Volume{ + { + Name: "test", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: config.Name, + }, + }, + }, + }, + { + Name: "codeflare-sdk", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + { + Name: "workdir", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + Containers: []corev1.Container{ + { + Name: "test", + // FIXME: switch to base Python image once the dependency on OpenShift CLI is removed + // See https://github.com/project-codeflare/codeflare-sdk/pull/146 + Image: "quay.io/opendatahub/notebooks:jupyter-minimal-ubi8-python-3.8-4c8f26e", + Env: []corev1.EnvVar{ + {Name: "PYTHONUSERBASE", Value: "/workdir"}, + {Name: "RAY_IMAGE", Value: GetRayImage()}, + }, + Command: []string{ + "/bin/sh", "-c", + "while [ ! -f /codeflare-sdk/pyproject.toml ]; do sleep 1; done; " + + "cp /test/* . && chmod +x install-codeflare-sdk.sh && ./install-codeflare-sdk.sh && python mnist_rayjob.py " + namespace.Name, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "test", + MountPath: "/test", + }, + { + Name: "codeflare-sdk", + MountPath: "/codeflare-sdk", + }, + { + Name: "workdir", + MountPath: "/workdir", + }, + }, + WorkingDir: "/workdir", + SecurityContext: &corev1.SecurityContext{ + AllowPrivilegeEscalation: Ptr(false), + SeccompProfile: &corev1.SeccompProfile{ + Type: "RuntimeDefault", + }, + Capabilities: &corev1.Capabilities{ + Drop: []corev1.Capability{"ALL"}, + }, + RunAsNonRoot: Ptr(true), + }, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + ServiceAccountName: serviceAccount.Name, + }, + }, + }, + } + + if GetClusterType(test) == KindCluster { + // Take first KinD node and redirect pod hostname requests there + node := GetNodes(test)[0] + hostname := GetClusterHostname(test) + IP := GetNodeInternalIP(test, node) + + test.T().Logf("Setting KinD cluster hostname '%s' to node IP '%s' for SDK pod", hostname, IP) + job.Spec.Template.Spec.HostAliases = []corev1.HostAlias{ + { + IP: IP, + Hostnames: []string{hostname}, + }, + } + + // Propagate hostname into Python code as env variable + hostnameEnvVar := corev1.EnvVar{Name: "CLUSTER_HOSTNAME", Value: hostname} + job.Spec.Template.Spec.Containers[0].Env = append(job.Spec.Template.Spec.Containers[0].Env, hostnameEnvVar) + } + + job, err := test.Client().Core().BatchV1().Jobs(nsName).Create(test.Ctx(), job, metav1.CreateOptions{}) + test.Expect(err).NotTo(HaveOccurred()) + test.T().Logf("Created Job %s/%s successfully", job.Namespace, job.Name) + + // Setup the codeflare-sdk inside the pod associated to the created job + SetupCodeflareSDKInsidePod(test, namespace, job.Name) + + test.T().Logf("Waiting for Job %s/%s to complete", job.Namespace, job.Name) + test.Eventually(Job(test, job.Namespace, job.Name), TestTimeoutLong).Should( + Or( + WithTransform(ConditionStatus(batchv1.JobComplete), Equal(corev1.ConditionTrue)), + WithTransform(ConditionStatus(batchv1.JobFailed), Equal(corev1.ConditionTrue)), + )) + + // Assert the job has completed successfully + test.Expect(GetJob(test, job.Namespace, job.Name)). + To(WithTransform(ConditionStatus(batchv1.JobComplete), Equal(corev1.ConditionTrue))) + +}