Skip to content

Commit

Permalink
Create kueue resources as part of test execution
Browse files Browse the repository at this point in the history
  • Loading branch information
ChughShilpa committed Sep 27, 2024
1 parent 59a1d23 commit 0e5a0b0
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 61 deletions.
17 changes: 12 additions & 5 deletions test/e2e/mnist_pytorch_appwrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,30 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/kueue/apis/kueue/v1beta1"
)

func TestMnistPyTorchAppWrapperCpu(t *testing.T) {
runMnistPyTorchAppWrapper(t, "cpu")
runMnistPyTorchAppWrapper(t, "cpu", 0)
}

func TestMnistPyTorchAppWrapperGpu(t *testing.T) {
runMnistPyTorchAppWrapper(t, "gpu")
runMnistPyTorchAppWrapper(t, "gpu", 1)
}

// Trains the MNIST dataset as a batch Job in an AppWrapper, and asserts successful completion of the training job.
func runMnistPyTorchAppWrapper(t *testing.T, accelerator string) {
func runMnistPyTorchAppWrapper(t *testing.T, accelerator string, numberOfGpus int) {
test := With(t)

// Create a namespace and localqueue in that namespace
// Create a namespace
namespace := test.NewTestNamespace()
localQueue := CreateKueueLocalQueue(test, namespace.Name, "e2e-cluster-queue")

// Create Kueue resources
resourceFlavor := CreateKueueResourceFlavor(test, v1beta1.ResourceFlavorSpec{})
defer test.Client().Kueue().KueueV1beta1().ResourceFlavors().Delete(test.Ctx(), resourceFlavor.Name, metav1.DeleteOptions{})
clusterQueue := createClusterQueue(test, resourceFlavor, numberOfGpus)
defer test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueue.Name, metav1.DeleteOptions{})
localQueue := CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name, AsDefaultQueue)

// Test configuration
config := &corev1.ConfigMap{
Expand Down
79 changes: 62 additions & 17 deletions test/e2e/mnist_rayjob_raycluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/kueue/apis/kueue/v1beta1"
)

// Trains the MNIST dataset as a RayJob, executed by a Ray cluster
Expand All @@ -49,9 +50,15 @@ func TestMnistRayJobRayClusterGpu(t *testing.T) {
func runMnistRayJobRayCluster(t *testing.T, accelerator string, numberOfGpus int) {
test := With(t)

// Create a namespace and localqueue in that namespace
// Create a namespace
namespace := test.NewTestNamespace()
localQueue := CreateKueueLocalQueue(test, namespace.Name, "e2e-cluster-queue")

// Create Kueue resources
resourceFlavor := CreateKueueResourceFlavor(test, v1beta1.ResourceFlavorSpec{})
defer test.Client().Kueue().KueueV1beta1().ResourceFlavors().Delete(test.Ctx(), resourceFlavor.Name, metav1.DeleteOptions{})
clusterQueue := createClusterQueue(test, resourceFlavor, numberOfGpus)
defer test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueue.Name, metav1.DeleteOptions{})
CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name, AsDefaultQueue)

// Create MNIST training script
mnist := constructMNISTConfigMap(test, namespace)
Expand All @@ -61,7 +68,6 @@ func runMnistRayJobRayCluster(t *testing.T, accelerator string, numberOfGpus int

// Create RayCluster and assign it to the localqueue
rayCluster := constructRayCluster(test, namespace, mnist, numberOfGpus)
AssignToLocalQueue(rayCluster, localQueue)
rayCluster, err = test.Client().Ray().RayV1().RayClusters(namespace.Name).Create(test.Ctx(), rayCluster, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created RayCluster %s/%s successfully", rayCluster.Namespace, rayCluster.Name)
Expand All @@ -78,8 +84,8 @@ func runMnistRayJobRayCluster(t *testing.T, accelerator string, numberOfGpus int

rayDashboardURL := getRayDashboardURL(test, rayCluster.Namespace, rayCluster.Name)

test.T().Logf("Connecting to Ray cluster at: %s", rayDashboardURL.String())
rayClient := NewRayClusterClient(rayDashboardURL)
test.T().Logf("Connecting to Ray cluster at: %s", rayDashboardURL)
rayClient := GetRayClusterClient(test, rayDashboardURL, test.Config().BearerToken)

test.T().Logf("Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name)
test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutLong).
Expand Down Expand Up @@ -111,9 +117,15 @@ func TestMnistRayJobRayClusterAppWrapperGpu(t *testing.T) {
func runMnistRayJobRayClusterAppWrapper(t *testing.T, accelerator string, numberOfGpus int) {
test := With(t)

// Create a namespace and localqueue in that namespace
// Create a namespace
namespace := test.NewTestNamespace()
localQueue := CreateKueueLocalQueue(test, namespace.Name, "e2e-cluster-queue")

// Create Kueue resources
resourceFlavor := CreateKueueResourceFlavor(test, v1beta1.ResourceFlavorSpec{})
defer test.Client().Kueue().KueueV1beta1().ResourceFlavors().Delete(test.Ctx(), resourceFlavor.Name, metav1.DeleteOptions{})
clusterQueue := createClusterQueue(test, resourceFlavor, numberOfGpus)
defer test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueue.Name, metav1.DeleteOptions{})
localQueue := CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name, AsDefaultQueue)

// Create MNIST training script
mnist := constructMNISTConfigMap(test, namespace)
Expand Down Expand Up @@ -167,8 +179,8 @@ func runMnistRayJobRayClusterAppWrapper(t *testing.T, accelerator string, number

rayDashboardURL := getRayDashboardURL(test, rayCluster.Namespace, rayCluster.Name)

test.T().Logf("Connecting to Ray cluster at: %s", rayDashboardURL.String())
rayClient := NewRayClusterClient(rayDashboardURL)
test.T().Logf("Connecting to Ray cluster at: %s", rayDashboardURL)
rayClient := GetRayClusterClient(test, rayDashboardURL, test.Config().BearerToken)

test.T().Logf("Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name)
test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutLong).
Expand Down Expand Up @@ -374,7 +386,7 @@ func constructRayJob(_ Test, namespace *corev1.Namespace, rayCluster *rayv1.RayC
}
}

func getRayDashboardURL(test Test, namespace, rayClusterName string) url.URL {
func getRayDashboardURL(test Test, namespace, rayClusterName string) string {
dashboardName := "ray-dashboard-" + rayClusterName

if IsOpenShift(test) {
Expand All @@ -396,10 +408,10 @@ func getRayDashboardURL(test Test, namespace, rayClusterName string) url.URL {
return resp.StatusCode, nil
}, TestTimeoutShort).Should(Not(Equal(503)))

return url.URL{
Scheme: "https",
Host: hostname,
}
dashboardUrl, _ := url.Parse("https://" + hostname)
test.T().Logf("Ray-dashboard route : %s\n", dashboardUrl.String())

return dashboardUrl.String()
}

ingress := GetIngress(test, namespace, dashboardName)
Expand All @@ -408,8 +420,41 @@ func getRayDashboardURL(test Test, namespace, rayClusterName string) url.URL {
test.Eventually(Ingress(test, ingress.Namespace, ingress.Name), TestTimeoutShort).
Should(WithTransform(LoadBalancerIngresses, HaveLen(1)))

return url.URL{
Scheme: "http",
Host: ingress.Spec.Rules[0].Host,
hostname := ingress.Spec.Rules[0].Host
dashboardUrl, _ := url.Parse("http://" + hostname)
test.T().Logf("Ray-dashboard route : %s\n", dashboardUrl.String())

return dashboardUrl.String()
}

// Create ClusterQueue
func createClusterQueue(test Test, resourceFlavor *v1beta1.ResourceFlavor, numberOfGpus int) *v1beta1.ClusterQueue {
cqSpec := v1beta1.ClusterQueueSpec{
NamespaceSelector: &metav1.LabelSelector{},
ResourceGroups: []v1beta1.ResourceGroup{
{
CoveredResources: []corev1.ResourceName{corev1.ResourceName("cpu"), corev1.ResourceName("memory"), corev1.ResourceName("nvidia.com/gpu")},
Flavors: []v1beta1.FlavorQuotas{
{
Name: v1beta1.ResourceFlavorReference(resourceFlavor.Name),
Resources: []v1beta1.ResourceQuota{
{
Name: corev1.ResourceCPU,
NominalQuota: resource.MustParse("8"),
},
{
Name: corev1.ResourceMemory,
NominalQuota: resource.MustParse("12Gi"),
},
{
Name: corev1.ResourceName("nvidia.com/gpu"),
NominalQuota: resource.MustParse(fmt.Sprint(numberOfGpus)),
},
},
},
},
},
},
}
return CreateKueueClusterQueue(test, cqSpec)
}
27 changes: 0 additions & 27 deletions test/e2e/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -69,30 +69,3 @@ done
echo ""

sleep 5
echo Creating Kueue ResourceFlavor and ClusterQueue
cat <<EOF | kubectl apply -f -
apiVersion: kueue.x-k8s.io/v1beta1
kind: ResourceFlavor
metadata:
name: "default-flavor"
EOF

cat <<EOF | kubectl apply -f -
apiVersion: kueue.x-k8s.io/v1beta1
kind: ClusterQueue
metadata:
name: "e2e-cluster-queue"
spec:
namespaceSelector: {} # match all.
resourceGroups:
- coveredResources: ["cpu","memory", "nvidia.com/gpu"]
flavors:
- name: "default-flavor"
resources:
- name: "cpu"
nominalQuota: 4
- name: "memory"
nominalQuota: "20G"
- name: "nvidia.com/gpu"
nominalQuota: "1"
EOF
12 changes: 0 additions & 12 deletions test/e2e/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ import (

"github.com/onsi/gomega"
"github.com/project-codeflare/codeflare-common/support"

"sigs.k8s.io/controller-runtime/pkg/client"
kueuev1beta1 "sigs.k8s.io/kueue/apis/kueue/v1beta1"
)

//go:embed *.py *.txt *.sh
Expand All @@ -35,12 +32,3 @@ func ReadFile(t support.Test, fileName string) []byte {
t.Expect(err).NotTo(gomega.HaveOccurred())
return file
}

func AssignToLocalQueue(object client.Object, localqueue *kueuev1beta1.LocalQueue) {
labels := object.GetLabels()
if labels == nil {
labels = make(map[string]string)
}
labels["kueue.x-k8s.io/queue-name"] = localqueue.Name
object.SetLabels(labels)
}

0 comments on commit 0e5a0b0

Please sign in to comment.