diff --git a/Makefile b/Makefile index 8f9af5740..a293ce5b8 100644 --- a/Makefile +++ b/Makefile @@ -18,7 +18,7 @@ APPWRAPPER_REPO ?= github.com/project-codeflare/appwrapper APPWRAPPER_CRD ?= ${APPWRAPPER_REPO}/config/crd?ref=${APPWRAPPER_VERSION} # KUEUE_VERSION defines the default version of Kueue (used for testing) -KUEUE_VERSION ?= v0.7.0 +KUEUE_VERSION ?= v0.7.1 USE_RHOAI ?= true # KUBERAY_VERSION defines the default version of the KubeRay operator (used for testing) diff --git a/go.mod b/go.mod index cb20dbb48..c086b6a8f 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/ray-project/kuberay/ray-operator v1.1.1 go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20230905200255-921286631fa9 + gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.30.1 k8s.io/apiextensions-apiserver v0.29.2 k8s.io/apimachinery v0.30.1 @@ -99,7 +100,6 @@ require ( google.golang.org/appengine v1.6.8 // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiserver v0.29.5 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect diff --git a/main.go b/main.go index 9c0d0437d..2c498da2e 100644 --- a/main.go +++ b/main.go @@ -151,7 +151,21 @@ func main() { }, AppWrapper: &config.AppWrapperConfiguration{ Enabled: ptr.To(false), - Config: awconfig.NewAppWrapperConfig(), + Config: &awconfig.AppWrapperConfig{ + EnableKueueIntegrations: true, + DisableChildAdmissionCtrl: true, + UserRBACAdmissionCheck: true, + FaultTolerance: &awconfig.FaultToleranceConfig{ + AdmissionGracePeriod: 15 * time.Minute, + WarmupGracePeriod: 15 * time.Minute, + FailureGracePeriod: 15 * time.Minute, + RetryPausePeriod: 90 * time.Second, + RetryLimit: 30, + ForcefulDeletionGracePeriod: 15 * time.Minute, + GracePeriodMaximum: 24 * time.Hour, + SuccessTTL: 7 * 24 * time.Hour, + }, + }, }, } diff --git a/test/e2e/mnist_pytorch_appwrapper_test.go b/test/e2e/mnist_pytorch_appwrapper_test.go index 94239f57c..c0b87cb34 100644 --- a/test/e2e/mnist_pytorch_appwrapper_test.go +++ b/test/e2e/mnist_pytorch_appwrapper_test.go @@ -44,6 +44,7 @@ func runMnistPyTorchAppWrapper(t *testing.T, accelerator string) { // Create a namespace and localqueue in that namespace namespace := test.NewTestNamespace() + localQueue := CreateKueueLocalQueue(test, namespace.Name, "e2e-cluster-queue") // Test configuration @@ -83,6 +84,12 @@ func runMnistPyTorchAppWrapper(t *testing.T, accelerator string) { Parallelism: Ptr(int32(1)), Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ + Tolerations: []corev1.Toleration{ + { + Key: "nvidia.com/gpu", + Operator: corev1.TolerationOpExists, + }, + }, Containers: []corev1.Container{ { Name: "job", diff --git a/test/e2e/mnist_rayjob_raycluster_test.go b/test/e2e/mnist_rayjob_raycluster_test.go index 0f2490c21..77bf866c9 100644 --- a/test/e2e/mnist_rayjob_raycluster_test.go +++ b/test/e2e/mnist_rayjob_raycluster_test.go @@ -23,10 +23,12 @@ import ( "net/url" "testing" + "github.com/onsi/gomega" . "github.com/onsi/gomega" mcadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2" . "github.com/project-codeflare/codeflare-common/support" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + "gopkg.in/yaml.v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -111,6 +113,12 @@ func runMnistRayJobRayClusterAppWrapper(t *testing.T, accelerator string, number // Create a namespace and localqueue in that namespace namespace := test.NewTestNamespace() + test.T().Cleanup(func() { + storeLocalQueue(test, namespace) + storeClusterQueue(test) + storeWorkloads(test) + storeAppWrapper(test, namespace) + }) localQueue := CreateKueueLocalQueue(test, namespace.Name, "e2e-cluster-queue") // Create MNIST training script @@ -206,8 +214,8 @@ func constructRayCluster(_ Test, namespace *corev1.Namespace, mnist *corev1.Conf Kind: "RayCluster", }, ObjectMeta: metav1.ObjectMeta{ - Name: "raycluster", - Namespace: namespace.Name, + GenerateName: "raycluster", + Namespace: namespace.Name, }, Spec: rayv1.RayClusterSpec{ RayVersion: GetRayVersion(), @@ -266,6 +274,12 @@ func constructRayCluster(_ Test, namespace *corev1.Namespace, mnist *corev1.Conf RayStartParams: map[string]string{}, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ + Tolerations: []corev1.Toleration{ + { + Key: "nvidia.com/gpu", + Operator: corev1.TolerationOpExists, + }, + }, Containers: []corev1.Container{ { Name: "ray-worker", @@ -401,3 +415,74 @@ func getRayDashboardURL(test Test, namespace, rayClusterName string) url.URL { Host: ingress.Spec.Rules[0].Host, } } + +func storeLocalQueue(t Test, namespace *corev1.Namespace) { + t.T().Helper() + + lqs, err := t.Client().Kueue().KueueV1beta1().LocalQueues(namespace.Name).List(t.Ctx(), metav1.ListOptions{}) + t.Expect(err).NotTo(gomega.HaveOccurred()) + + for _, lq := range lqs.Items { + var resource map[string]interface{} + resource, err = runtime.DefaultUnstructuredConverter.ToUnstructured(&lq) + t.Expect(err).NotTo(gomega.HaveOccurred()) + lqByte, err := yaml.Marshal(resource) + t.Expect(err).NotTo(gomega.HaveOccurred()) + + WriteToOutputDir(t, lq.Name, Log, lqByte) + } +} + +func storeClusterQueue(t Test) { + t.T().Helper() + + cqs, err := t.Client().Kueue().KueueV1beta1().ClusterQueues().List(t.Ctx(), metav1.ListOptions{}) + t.Expect(err).NotTo(gomega.HaveOccurred()) + + for _, cq := range cqs.Items { + var resource map[string]interface{} + resource, err = runtime.DefaultUnstructuredConverter.ToUnstructured(&cq) + t.Expect(err).NotTo(gomega.HaveOccurred()) + lqByte, err := yaml.Marshal(resource) + t.Expect(err).NotTo(gomega.HaveOccurred()) + + WriteToOutputDir(t, cq.Name, Log, lqByte) + } +} + +func storeWorkloads(t Test) { + t.T().Helper() + + namespaces, err := t.Client().Core().CoreV1().Namespaces().List(t.Ctx(), metav1.ListOptions{}) + t.Expect(err).NotTo(gomega.HaveOccurred()) + + for _, namespace := range namespaces.Items { + workloads := GetKueueWorkloads(t, namespace.Name) + + for _, workload := range workloads { + var resource map[string]interface{} + resource, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&workload) + t.Expect(err).NotTo(gomega.HaveOccurred()) + workloadByte, err := yaml.Marshal(resource) + t.Expect(err).NotTo(gomega.HaveOccurred()) + + WriteToOutputDir(t, workload.Name, Log, workloadByte) + } + } +} + +func storeAppWrapper(t Test, namespace *corev1.Namespace) { + t.T().Helper() + + aws := AppWrappers(t, namespace)(t) + + for _, aw := range aws { + var resource map[string]interface{} + resource, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&aw) + t.Expect(err).NotTo(gomega.HaveOccurred()) + lqByte, err := yaml.Marshal(resource) + t.Expect(err).NotTo(gomega.HaveOccurred()) + + WriteToOutputDir(t, aw.Name, Log, lqByte) + } +}