Skip to content

Commit

Permalink
e2e: Use Ray dashboard API to retrieve job logs
Browse files Browse the repository at this point in the history
  • Loading branch information
astefanutti committed Aug 3, 2023
1 parent ca3baf6 commit 8a36ca3
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 28 deletions.
9 changes: 9 additions & 0 deletions .github/actions/kind/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,12 @@ runs:
echo "KinD cluster:"
kubectl cluster-info
kubectl describe nodes
- name: Install Ingress controller
shell: bash
run: |
VERSION=controller-v1.6.4
echo "Deploying Ingress controller into KinD cluster"
curl https://raw.githubusercontent.com/kubernetes/ingress-nginx/"${VERSION}"/deploy/static/provider/kind/deploy.yaml | sed "s/--publish-status-address=localhost/--report-node-internal-ip-address\\n - --status-update-interval=10/g" | kubectl apply -f -
kubectl annotate ingressclass nginx "ingressclass.kubernetes.io/is-default-class=true"
kubectl -n ingress-nginx wait --timeout=300s --for=condition=Available deployments --all
103 changes: 102 additions & 1 deletion test/e2e/mnist_rayjob_mcad_raycluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@ package e2e

import (
"encoding/base64"
"net/url"
"testing"

. "github.com/onsi/gomega"
mcadv1beta1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"
rayv1alpha1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1"

corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"

routev1 "github.com/openshift/api/route/v1"

. "github.com/project-codeflare/codeflare-operator/test/support"
)
Expand Down Expand Up @@ -252,8 +257,104 @@ func TestMNISTRayJobMCADRayCluster(t *testing.T) {
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)

var rayDashboardURL url.URL
if IsOpenShift(test) {
// Create a route to expose the Ray cluster API
route := &routev1.Route{
TypeMeta: metav1.TypeMeta{
APIVersion: routev1.GroupVersion.String(),
Kind: "Route",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace.Name,
Name: "ray-dashboard",
},
Spec: routev1.RouteSpec{
To: routev1.RouteTargetReference{
Name: "raycluster-head-svc",
},
Port: &routev1.RoutePort{
TargetPort: intstr.FromString("dashboard"),
},
},
}

_, err := test.Client().Route().RouteV1().Routes(namespace.Name).Create(test.Ctx(), route, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created Route %s/%s successfully", route.Namespace, route.Name)

test.T().Logf("Waiting for Route %s/%s to be admitted", route.Namespace, route.Name)
test.Eventually(Route(test, route.Namespace, route.Name), TestTimeoutMedium).
Should(WithTransform(ConditionStatus(routev1.RouteAdmitted), Equal(corev1.ConditionTrue)))

route = GetRoute(test, route.Namespace, route.Name)

rayDashboardURL = url.URL{
Scheme: "http",
Host: route.Status.Ingress[0].Host,
}
} else {
ingress := &networkingv1.Ingress{
TypeMeta: metav1.TypeMeta{
APIVersion: networkingv1.SchemeGroupVersion.String(),
Kind: "Ingress",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace.Name,
Name: "ray-dashboard",
Annotations: map[string]string{
"nginx.ingress.kubernetes.io/use-regex": "true",
"nginx.ingress.kubernetes.io/rewrite-target": "/$2",
},
},
Spec: networkingv1.IngressSpec{
Rules: []networkingv1.IngressRule{
{
IngressRuleValue: networkingv1.IngressRuleValue{
HTTP: &networkingv1.HTTPIngressRuleValue{
Paths: []networkingv1.HTTPIngressPath{
{
Path: "/ray-dashboard(/|$)(.*)",
PathType: Ptr(networkingv1.PathTypePrefix),
Backend: networkingv1.IngressBackend{
Service: &networkingv1.IngressServiceBackend{
Name: "raycluster-head-svc",
Port: networkingv1.ServiceBackendPort{
Name: "dashboard",
},
},
},
},
},
},
},
},
},
},
}

_, err := test.Client().Core().NetworkingV1().Ingresses(ingress.Namespace).Create(test.Ctx(), ingress, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created Ingress %s/%s successfully", ingress.Namespace, ingress.Name)

test.T().Logf("Waiting for Ingress %s/%s to be admitted", ingress.Namespace, ingress.Name)
test.Eventually(Ingress(test, ingress.Namespace, ingress.Name), TestTimeoutMedium).
Should(WithTransform(LoadBalancerIngresses, HaveLen(1)))

ingress = GetIngress(test, ingress.Namespace, ingress.Name)

rayDashboardURL = url.URL{
Scheme: "http",
Host: ingress.Status.LoadBalancer.Ingress[0].IP,
Path: "ray-dashboard",
}
}

test.T().Logf("Connecting to Ray cluster at: %s", rayDashboardURL.String())
rayClient := NewRayClusterClient(rayDashboardURL)

// Retrieving the job logs once it has completed or timed out
defer WriteRayJobLogs(test, rayJob.Namespace, rayJob.Name)
defer WriteRayJobAPILogs(test, rayClient, GetRayJobId(test, rayJob.Namespace, rayJob.Name))

test.T().Logf("Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name)
test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutLong).
Expand Down
41 changes: 41 additions & 0 deletions test/support/ingress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright 2023.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package support

import (
"github.com/onsi/gomega"

networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func Ingress(t Test, namespace, name string) func(g gomega.Gomega) *networkingv1.Ingress {
return func(g gomega.Gomega) *networkingv1.Ingress {
ingress, err := t.Client().Core().NetworkingV1().Ingresses(namespace).Get(t.Ctx(), name, metav1.GetOptions{})
g.Expect(err).NotTo(gomega.HaveOccurred())
return ingress
}
}

func GetIngress(t Test, namespace, name string) *networkingv1.Ingress {
t.T().Helper()
return Ingress(t, namespace, name)(t)
}

func LoadBalancerIngresses(ingress *networkingv1.Ingress) []networkingv1.IngressLoadBalancerIngress {
return ingress.Status.LoadBalancer.Ingress
}
26 changes: 3 additions & 23 deletions test/support/ray.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package support

import (
"encoding/json"

"github.com/onsi/gomega"
rayv1alpha1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1"

Expand All @@ -44,28 +42,10 @@ func RayJobStatus(job *rayv1alpha1.RayJob) rayv1alpha1.JobStatus {
return job.Status.JobStatus
}

func GetRayJobLogs(t Test, namespace, name string) []byte {
func GetRayJobId(t Test, namespace, name string) string {
t.T().Helper()

job := GetRayJob(t, namespace, name)

response := t.Client().Core().CoreV1().RESTClient().
Get().
AbsPath("/api/v1/namespaces", job.Namespace, "services", "http:"+job.Status.RayClusterName+"-head-svc:dashboard", "proxy", "api", "jobs", job.Status.JobId, "logs").
Do(t.Ctx())
t.Expect(response.Error()).NotTo(gomega.HaveOccurred())

body := map[string]string{}
bytes, _ := response.Raw()
t.Expect(json.Unmarshal(bytes, &body)).To(gomega.Succeed())
t.Expect(body).To(gomega.HaveKey("logs"))

return []byte(body["logs"])
}

func WriteRayJobLogs(t Test, namespace, name string) {
t.T().Logf("Retrieving RayJob %s/%s logs", namespace, name)
WriteToOutputDir(t, name, Log, GetRayJobLogs(t, namespace, name))
job := RayJob(t, namespace, name)(t)
return job.Status.JobId
}

func RayCluster(t Test, namespace, name string) func(g gomega.Gomega) *rayv1alpha1.RayCluster {
Expand Down
8 changes: 4 additions & 4 deletions test/support/ray_cluster_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package support
import (
"bytes"
"encoding/json"
"io/ioutil"
"io"
"net/http"
"net/url"
)
Expand Down Expand Up @@ -72,7 +72,7 @@ func (client *rayClusterClient) CreateJob(job *RayJobSetup) (response *RayJobRes
return
}

respData, err := ioutil.ReadAll(resp.Body)
respData, err := io.ReadAll(resp.Body)
if err != nil {
return
}
Expand All @@ -88,7 +88,7 @@ func (client *rayClusterClient) GetJobDetails(jobID string) (response *RayJobDet
return
}

respData, err := ioutil.ReadAll(resp.Body)
respData, err := io.ReadAll(resp.Body)
if err != nil {
return
}
Expand All @@ -104,7 +104,7 @@ func (client *rayClusterClient) GetJobLogs(jobID string) (logs string, err error
return
}

respData, err := ioutil.ReadAll(resp.Body)
respData, err := io.ReadAll(resp.Body)
if err != nil {
return
}
Expand Down
5 changes: 5 additions & 0 deletions test/support/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,8 @@ func Route(t Test, namespace, name string) func(g gomega.Gomega) *routev1.Route
return route
}
}

func GetRoute(t Test, namespace, name string) *routev1.Route {
t.T().Helper()
return Route(t, namespace, name)(t)
}

0 comments on commit 8a36ca3

Please sign in to comment.