Skip to content

Commit

Permalink
test: Add MNIST training in RayCluster with CodeFlare SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
astefanutti committed Jun 22, 2023
1 parent 8237d22 commit 01718c4
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 42 deletions.
37 changes: 6 additions & 31 deletions test/e2e/mnist_pytorch_mcad_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ func TestMNISTPyTorchMCAD(t *testing.T) {
namespace := test.NewTestNamespace()

// MNIST training script
mnist, err := scripts.ReadFile("mnist.py")
test.Expect(err).NotTo(HaveOccurred())

mnistScript := &corev1.ConfigMap{
mnist := &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1.SchemeGroupVersion.String(),
Kind: "ConfigMap",
Expand All @@ -52,13 +49,13 @@ func TestMNISTPyTorchMCAD(t *testing.T) {
Namespace: namespace.Name,
},
BinaryData: map[string][]byte{
"mnist.py": mnist,
"mnist.py": ReadFile(test, "mnist.py"),
},
Immutable: Ptr(true),
}
mnistScript, err = test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Create(test.Ctx(), mnistScript, metav1.CreateOptions{})
mnist, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Create(test.Ctx(), mnist, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", mnistScript.Namespace, mnistScript.Name)
test.T().Logf("Created ConfigMap %s/%s successfully", mnist.Namespace, mnist.Name)

// pip requirements
requirements := &corev1.ConfigMap{
Expand Down Expand Up @@ -121,7 +118,7 @@ torchvision==0.12.0
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: mnistScript.Name,
Name: mnist.Name,
},
},
},
Expand Down Expand Up @@ -182,7 +179,7 @@ torchvision==0.12.0
test.Eventually(AppWrapper(test, namespace, aw.Name), TestTimeoutMedium).
Should(WithTransform(AppWrapperState, Equal(mcadv1beta1.AppWrapperStateActive)))

defer troubleshooting(test, job)
defer JobTroubleshooting(test, job)

test.T().Logf("Waiting for Job %s/%s to complete successfully", job.Namespace, job.Name)
test.Eventually(Job(test, job.Namespace, job.Name), TestTimeoutLong).
Expand All @@ -201,25 +198,3 @@ torchvision==0.12.0
test.T().Logf("Printing Job %s/%s logs", job.Namespace, job.Name)
test.T().Log(GetPodLogs(test, &pods[0], corev1.PodLogOptions{}))
}

func troubleshooting(test Test, job *batchv1.Job) {
if !test.T().Failed() {
return
}
job = GetJob(test, job.Namespace, job.Name)

test.T().Errorf("Job %s/%s hasn't completed in time: %s", job.Namespace, job.Name, job)

pods := GetPods(test, job.Namespace, metav1.ListOptions{
LabelSelector: labels.FormatLabels(job.Spec.Selector.MatchLabels)},
)

if len(pods) == 0 {
test.T().Errorf("Job %s/%s has no pods scheduled", job.Namespace, job.Name)
} else {
for i, pod := range pods {
test.T().Logf("Printing Pod %s/%s logs", pod.Namespace, pod.Name)
test.T().Log(GetPodLogs(test, &pods[i], corev1.PodLogOptions{}))
}
}
}
145 changes: 145 additions & 0 deletions test/e2e/mnist_raycluster_sdk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
Copyright 2023.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package e2e

import (
"testing"

. "github.com/onsi/gomega"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

. "github.com/project-codeflare/codeflare-operator/test/support"
)

func TestMNISTRayClusterSDK(t *testing.T) {
test := With(t)
test.T().Parallel()

test.T().Skip("Requires https://github.com/project-codeflare/codeflare-sdk/pull/146")

// Create a namespace
namespace := test.NewTestNamespace()

// SDK script
sdk := &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1.SchemeGroupVersion.String(),
Kind: "ConfigMap",
},
ObjectMeta: metav1.ObjectMeta{
Name: "sdk",
Namespace: namespace.Name,
},
BinaryData: map[string][]byte{
"sdk.py": ReadFile(test, "sdk.py"),
},
Immutable: Ptr(true),
}
sdk, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Create(test.Ctx(), sdk, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", sdk.Namespace, sdk.Name)

// pip requirements
requirements := &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1.SchemeGroupVersion.String(),
Kind: "ConfigMap",
},
ObjectMeta: metav1.ObjectMeta{
Name: "requirements",
Namespace: namespace.Name,
},
BinaryData: map[string][]byte{
"requirements.txt": ReadFile(test, "requirements.txt"),
},
Immutable: Ptr(true),
}
requirements, err = test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Create(test.Ctx(), requirements, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", requirements.Namespace, requirements.Name)

job := &batchv1.Job{
TypeMeta: metav1.TypeMeta{
APIVersion: batchv1.SchemeGroupVersion.String(),
Kind: "Job",
},
ObjectMeta: metav1.ObjectMeta{
Name: "sdk",
Namespace: namespace.Name,
},
Spec: batchv1.JobSpec{
Completions: Ptr(int32(1)),
Parallelism: Ptr(int32(1)),
BackoffLimit: Ptr(int32(0)),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "sdk",
Image: "quay.io/opendatahub/notebooks:jupyter-minimal-ubi8-python-3.8-4c8f26e",
Command: []string{"/bin/sh", "-c", "pip install -r /test/runtime/requirements.txt && python /test/job/sdk.py"},
VolumeMounts: []corev1.VolumeMount{
{
Name: "sdk",
MountPath: "/test/job",
},
{
Name: "requirements",
MountPath: "/test/runtime",
},
},
},
},
Volumes: []corev1.Volume{
{
Name: "sdk",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: sdk.Name,
},
},
},
},
{
Name: "requirements",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: requirements.Name,
},
},
},
},
},
RestartPolicy: corev1.RestartPolicyNever,
},
},
},
}
job, err = test.Client().Core().BatchV1().Jobs(namespace.Name).Create(test.Ctx(), job, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())

defer JobTroubleshooting(test, job)

test.T().Logf("Waiting for Job %s/%s to complete successfully", job.Namespace, job.Name)
test.Eventually(Job(test, job.Namespace, job.Name), TestTimeoutMedium).
Should(WithTransform(ConditionStatus(batchv1.JobComplete), Equal(corev1.ConditionTrue)))
}
13 changes: 5 additions & 8 deletions test/e2e/mnist_rayjob_mcad_raycluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ func TestMNISTRayJobMCADRayCluster(t *testing.T) {
namespace := test.NewTestNamespace()

// MNIST training script
mnist, err := scripts.ReadFile("mnist.py")
test.Expect(err).NotTo(HaveOccurred())

mnistScript := &corev1.ConfigMap{
mnist := &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1.SchemeGroupVersion.String(),
Kind: "ConfigMap",
Expand All @@ -52,13 +49,13 @@ func TestMNISTRayJobMCADRayCluster(t *testing.T) {
Namespace: namespace.Name,
},
BinaryData: map[string][]byte{
"mnist.py": mnist,
"mnist.py": ReadFile(test, "mnist.py"),
},
Immutable: Ptr(true),
}
mnistScript, err = test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Create(test.Ctx(), mnistScript, metav1.CreateOptions{})
mnist, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Create(test.Ctx(), mnist, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", mnistScript.Namespace, mnistScript.Name)
test.T().Logf("Created ConfigMap %s/%s successfully", mnist.Namespace, mnist.Name)

// RayCluster
rayCluster := &rayv1alpha1.RayCluster{
Expand Down Expand Up @@ -127,7 +124,7 @@ func TestMNISTRayJobMCADRayCluster(t *testing.T) {
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: mnistScript.Name,
Name: mnist.Name,
},
},
},
Expand Down
1 change: 1 addition & 0 deletions test/e2e/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
codeflare-sdk==0.4.4
39 changes: 39 additions & 0 deletions test/e2e/sdk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from codeflare_sdk.cluster.cluster import Cluster, ClusterConfiguration
# from codeflare_sdk.cluster.auth import TokenAuthentication
from codeflare_sdk.job.jobs import DDPJobDefinition

cluster = Cluster(ClusterConfiguration(
name='mnist',
# namespace='default',
min_worker=1,
max_worker=1,
min_cpus=0.2,
max_cpus=1,
min_memory=0.5,
max_memory=1,
gpu=0,
instascale=False,
))

cluster.up()

cluster.status()

cluster.wait_ready()

cluster.status()

cluster.details()

jobdef = DDPJobDefinition(
name="mnist",
script="/test/job/mnist.py",
scheduler_args={"requirements": "/test/runtime/requirements.txt"}
)
job = jobdef.submit(cluster)

job.status()

print(job.logs())

cluster.down()
18 changes: 15 additions & 3 deletions test/e2e/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,19 @@ limitations under the License.

package e2e

import "embed"
import (
"embed"

//go:embed *.py
var scripts embed.FS
"github.com/onsi/gomega"

"github.com/project-codeflare/codeflare-operator/test/support"
)

//go:embed *.py *.txt
var files embed.FS

func ReadFile(t support.Test, fileName string) []byte {
file, err := files.ReadFile(fileName)
t.Expect(err).NotTo(gomega.HaveOccurred())
return file
}
24 changes: 24 additions & 0 deletions test/support/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"github.com/onsi/gomega"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)

func Job(t Test, namespace, name string) func(g gomega.Gomega) *batchv1.Job {
Expand All @@ -35,3 +37,25 @@ func GetJob(t Test, namespace, name string) *batchv1.Job {
t.T().Helper()
return Job(t, namespace, name)(t)
}

func JobTroubleshooting(test Test, job *batchv1.Job) {
if !test.T().Failed() {
return
}
job = GetJob(test, job.Namespace, job.Name)

test.T().Errorf("Job %s/%s hasn't completed in time: %s", job.Namespace, job.Name, job)

pods := GetPods(test, job.Namespace, metav1.ListOptions{
LabelSelector: labels.FormatLabels(job.Spec.Selector.MatchLabels)},
)

if len(pods) == 0 {
test.T().Errorf("Job %s/%s has no pods scheduled", job.Namespace, job.Name)
} else {
for i, pod := range pods {
test.T().Logf("Printing Pod %s/%s logs", pod.Namespace, pod.Name)
test.T().Log(GetPodLogs(test, &pods[i], corev1.PodLogOptions{}))
}
}
}

0 comments on commit 01718c4

Please sign in to comment.