Skip to content

Commit

Permalink
Create a Ray Cluster SDK upgrade scenarios
Browse files Browse the repository at this point in the history
  • Loading branch information
Srihari1192 authored and astefanutti committed Jan 19, 2024
1 parent db25cd3 commit 65ed743
Show file tree
Hide file tree
Showing 7 changed files with 548 additions and 42 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

require (
github.com/onsi/gomega v1.27.10
github.com/project-codeflare/codeflare-common v0.0.0-20231110155354-042fb171fcdb
github.com/project-codeflare/codeflare-common v0.0.0-20231129165224-988ba1da9069
github.com/project-codeflare/multi-cluster-app-dispatcher v1.37.0
github.com/ray-project/kuberay/ray-operator v1.0.0
k8s.io/api v0.26.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/project-codeflare/codeflare-common v0.0.0-20231110155354-042fb171fcdb h1:L2Gdr2SlvshDKZY2KK6507AwzQ1NSfRbMQuz5dOsYNM=
github.com/project-codeflare/codeflare-common v0.0.0-20231110155354-042fb171fcdb/go.mod h1:zdi2GCYJX+QyxFWyCLMoTme3NMz/aucWDJWMqKfigxk=
github.com/project-codeflare/codeflare-common v0.0.0-20231129165224-988ba1da9069 h1:81+ma1mchF/LtAGsf+poAt50kJ/fLYjoTAcZOxci1Yc=
github.com/project-codeflare/codeflare-common v0.0.0-20231129165224-988ba1da9069/go.mod h1:zdi2GCYJX+QyxFWyCLMoTme3NMz/aucWDJWMqKfigxk=
github.com/project-codeflare/multi-cluster-app-dispatcher v1.37.0 h1:oyhdLdc4BgA4zcH1zlRrSrYpzuVxV5QLDbyIXrwnQqs=
github.com/project-codeflare/multi-cluster-app-dispatcher v1.37.0/go.mod h1:Yge6GRNpO9YIDfeL+XOcCE9xbmfCTD5C1h5dlW87mxQ=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
Expand Down
40 changes: 3 additions & 37 deletions tests/e2e/mnist_raycluster_sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ limitations under the License.
package e2e

import (
"strings"
"testing"
"time"

. "github.com/onsi/gomega"
. "github.com/project-codeflare/codeflare-common/support"
Expand Down Expand Up @@ -137,7 +135,7 @@ func TestMNISTRayClusterSDK(t *testing.T) {
Command: []string{
"/bin/sh", "-c",
"while [ ! -f /codeflare-sdk/pyproject.toml ]; do sleep 1; done; " +
"cp /test/* . && chmod +x install-codeflare-sdk.sh && ./install-codeflare-sdk.sh && python mnist_raycluster_sdk.py " + namespace.Name,
"cp /test/* . && chmod +x install-codeflare-sdk.sh && ./install-codeflare-sdk.sh && python mnist_raycluster_sdk.py " + namespace.Name,
},
VolumeMounts: []corev1.VolumeMount{
{
Expand Down Expand Up @@ -194,40 +192,8 @@ func TestMNISTRayClusterSDK(t *testing.T) {
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created Job %s/%s successfully", job.Namespace, job.Name)

go func() {
// Checking if pod is found and running
podName := ""
foundPod := false
for !foundPod {
pods, _ := test.Client().Core().CoreV1().Pods(namespace.Name).List(test.Ctx(), metav1.ListOptions{
LabelSelector: "job-name=sdk",
})
for _, pod := range pods.Items {
if strings.HasPrefix(pod.Name, "sdk-") && pod.Status.Phase == corev1.PodRunning {
podName = pod.Name
foundPod = true
test.T().Logf("Pod is running!")
break
}
}
if !foundPod {
test.T().Logf("Waiting for pod to start...")
time.Sleep(5 * time.Second)
}
}

// Get rest config
restConfig, err := GetRestConfig(test); if err != nil {
test.T().Errorf("Error getting rest config: %v", err)
}

// Copy codeflare-sdk to the pod
srcDir := "../.././"
dstDir := "/codeflare-sdk"
if err := CopyToPod(test, namespace.Name, podName, restConfig, srcDir, dstDir); err != nil {
test.T().Errorf("Error copying codeflare-sdk to pod: %v", err)
}
}()
// Setup the codeflare-sdk inside the pod associated to the created job
SetupCodeflareSDKInsidePod(test, namespace, job.Name)

test.T().Logf("Waiting for Job %s/%s to complete", job.Namespace, job.Name)
test.Eventually(Job(test, job.Namespace, job.Name), TestTimeoutLong).Should(
Expand Down
46 changes: 46 additions & 0 deletions tests/e2e/mnist_rayjob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import sys

from time import sleep

from torchx.specs.api import AppState, is_terminal

from codeflare_sdk.cluster.cluster import get_cluster
from codeflare_sdk.job.jobs import DDPJobDefinition

namespace = sys.argv[1]

cluster = get_cluster("mnist", namespace)

cluster.details()

jobdef = DDPJobDefinition(
name="mnist",
script="mnist.py",
scheduler_args={"requirements": "requirements.txt"},
)
job = jobdef.submit(cluster)

done = False
time = 0
timeout = 900
while not done:
status = job.status()
if is_terminal(status.state):
break
if not done:
print(status)
if timeout and time >= timeout:
raise TimeoutError(f"job has timed out after waiting {timeout}s")
sleep(5)
time += 5

print(f"Job has completed: {status.state}")

print(job.logs())

cluster.down()

if not status.state == AppState.SUCCEEDED:
exit(1)
else:
exit(0)
52 changes: 52 additions & 0 deletions tests/e2e/start_ray_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import sys
import os

from time import sleep

from codeflare_sdk.cluster.cluster import Cluster, ClusterConfiguration

namespace = sys.argv[1]
ray_image = os.getenv("RAY_IMAGE")
host = os.getenv("CLUSTER_HOSTNAME")

ingress_options = {}
if host is not None:
ingress_options = {
"ingresses": [
{
"ingressName": "ray-dashboard",
"port": 8265,
"pathType": "Prefix",
"path": "/",
"host": host,
},
]
}

cluster = Cluster(
ClusterConfiguration(
name="mnist",
namespace=namespace,
num_workers=1,
head_cpus="500m",
head_memory=2,
min_cpus="500m",
max_cpus=1,
min_memory=1,
max_memory=2,
num_gpus=0,
instascale=False,
image=ray_image,
ingress_options=ingress_options,
)
)

cluster.up()

cluster.status()

cluster.wait_ready()

cluster.status()

cluster.details()
50 changes: 48 additions & 2 deletions tests/e2e/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ import (
"embed"
"os"
"path/filepath"
"strings"
"time"

"github.com/onsi/gomega"
"github.com/project-codeflare/codeflare-common/support"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/cli-runtime/pkg/genericclioptions"
Expand All @@ -33,8 +38,6 @@ import (
"k8s.io/kubectl/pkg/cmd/cp"
"k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/scheme"

"github.com/project-codeflare/codeflare-common/support"
)

//go:embed *.py *.txt *.sh
Expand Down Expand Up @@ -109,3 +112,46 @@ func (r restClientGetter) ToDiscoveryClient() (discovery.CachedDiscoveryInterfac
func (r restClientGetter) ToRESTMapper() (meta.RESTMapper, error) {
return nil, nil
}

func SetupCodeflareSDKInsidePod(test support.Test, namespace *corev1.Namespace, labelName string) {

// Get pod name
podName := GetPodName(test, namespace, labelName)

// Get rest config
restConfig, err := GetRestConfig(test)
if err != nil {
test.T().Errorf("Error getting rest config: %v", err)
}

// Copy codeflare-sdk to the pod
srcDir := "../.././"
dstDir := "/codeflare-sdk"
if err := CopyToPod(test, namespace.Name, podName, restConfig, srcDir, dstDir); err != nil {
test.T().Errorf("Error copying codeflare-sdk to pod: %v", err)
}
}

func GetPodName(test support.Test, namespace *corev1.Namespace, labelName string) string {
podName := ""
foundPod := false
for !foundPod {
pods, _ := test.Client().Core().CoreV1().Pods(namespace.Name).List(test.Ctx(), metav1.ListOptions{
LabelSelector: "job-name=" + labelName,
})
for _, pod := range pods.Items {

if strings.HasPrefix(pod.Name, labelName+"-") && pod.Status.Phase == corev1.PodRunning {
podName = pod.Name
foundPod = true
test.T().Logf("Pod is running!")
break
}
}
if !foundPod {
test.T().Logf("Waiting for pod to start...")
time.Sleep(5 * time.Second)
}
}
return podName
}
Loading

0 comments on commit 65ed743

Please sign in to comment.