Skip to content

Commit 558dc64

Browse files
committed
remove unrelated change and fix comment
Signed-off-by: You-Cheng Lin (Owen) <[email protected]>
1 parent a547e87 commit 558dc64

File tree

9 files changed

+88
-38
lines changed

9 files changed

+88
-38
lines changed

apiserver/pkg/server/ray_job_submission_service_server.go

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type RayJobSubmissionServiceServer struct {
3232
api.UnimplementedRayJobSubmissionServiceServer
3333
options *RayJobSubmissionServiceServerOptions
3434
clusterServer *ClusterServer
35-
dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) utils.RayDashboardClientInterface
35+
dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (utils.RayDashboardClientInterface, error)
3636
log logr.Logger
3737
}
3838

@@ -50,8 +50,10 @@ func (s *RayJobSubmissionServiceServer) SubmitRayJob(ctx context.Context, req *a
5050
if err != nil {
5151
return nil, err
5252
}
53-
rayDashboardClient := s.dashboardClientFunc(nil, *url)
54-
// TODO: support proxy subresources in kuberay-apiserver
53+
rayDashboardClient, err := s.dashboardClientFunc(nil, *url)
54+
if err != nil {
55+
return nil, err
56+
}
5557
request := &utils.RayJobRequest{Entrypoint: req.Jobsubmission.Entrypoint}
5658
if req.Jobsubmission.SubmissionId != "" {
5759
request.SubmissionId = req.Jobsubmission.SubmissionId
@@ -87,7 +89,7 @@ func (s *RayJobSubmissionServiceServer) SubmitRayJob(ctx context.Context, req *a
8789
}
8890
}
8991

90-
sid, err := rayDashboardClient.SubmitJobReq(ctx, request)
92+
sid, err := rayDashboardClient.SubmitJobReq(ctx, request, nil)
9193
if err != nil {
9294
return nil, err
9395
}
@@ -102,7 +104,10 @@ func (s *RayJobSubmissionServiceServer) GetJobDetails(ctx context.Context, req *
102104
if err != nil {
103105
return nil, err
104106
}
105-
rayDashboardClient := s.dashboardClientFunc(nil, *url)
107+
rayDashboardClient, err := s.dashboardClientFunc(nil, *url)
108+
if err != nil {
109+
return nil, err
110+
}
106111
nodeInfo, err := rayDashboardClient.GetJobInfo(ctx, req.Submissionid)
107112
if err != nil {
108113
return nil, err
@@ -121,7 +126,10 @@ func (s *RayJobSubmissionServiceServer) GetJobLog(ctx context.Context, req *api.
121126
if err != nil {
122127
return nil, err
123128
}
124-
rayDashboardClient := s.dashboardClientFunc(nil, *url)
129+
rayDashboardClient, err := s.dashboardClientFunc(nil, *url)
130+
if err != nil {
131+
return nil, err
132+
}
125133
jlog, err := rayDashboardClient.GetJobLog(ctx, req.Submissionid)
126134
if err != nil {
127135
return nil, err
@@ -140,7 +148,10 @@ func (s *RayJobSubmissionServiceServer) ListJobDetails(ctx context.Context, req
140148
if err != nil {
141149
return nil, err
142150
}
143-
rayDashboardClient := s.dashboardClientFunc(nil, *url)
151+
rayDashboardClient, err := s.dashboardClientFunc(nil, *url)
152+
if err != nil {
153+
return nil, err
154+
}
144155
nodesInfo, err := rayDashboardClient.ListJobs(ctx)
145156
if err != nil {
146157
return nil, err
@@ -160,7 +171,10 @@ func (s *RayJobSubmissionServiceServer) StopRayJob(ctx context.Context, req *api
160171
if err != nil {
161172
return nil, err
162173
}
163-
rayDashboardClient := s.dashboardClientFunc(nil, *url)
174+
rayDashboardClient, err := s.dashboardClientFunc(nil, *url)
175+
if err != nil {
176+
return nil, err
177+
}
164178
err = rayDashboardClient.StopJob(ctx, req.Submissionid)
165179
if err != nil {
166180
return nil, err
@@ -176,7 +190,10 @@ func (s *RayJobSubmissionServiceServer) DeleteRayJob(ctx context.Context, req *a
176190
if err != nil {
177191
return nil, err
178192
}
179-
rayDashboardClient := s.dashboardClientFunc(nil, *url)
193+
rayDashboardClient, err := s.dashboardClientFunc(nil, *url)
194+
if err != nil {
195+
return nil, err
196+
}
180197
err = rayDashboardClient.DeleteJob(ctx, req.Submissionid)
181198
if err != nil {
182199
return nil, err

ray-operator/apis/config/v1alpha1/configuration_types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ type Configuration struct {
7676
EnableMetrics bool `json:"enableMetrics,omitempty"`
7777
}
7878

79-
func (config Configuration) GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) utils.RayDashboardClientInterface {
79+
func (config Configuration) GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (utils.RayDashboardClientInterface, error) {
8080
return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy)
8181
}
8282

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type RayJobReconciler struct {
4242
Scheme *runtime.Scheme
4343
Recorder record.EventRecorder
4444

45-
dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) utils.RayDashboardClientInterface
45+
dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (utils.RayDashboardClientInterface, error)
4646
options RayJobReconcilerOptions
4747
}
4848

@@ -115,7 +115,11 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
115115
logger.Error(err, "Failed to get RayCluster")
116116
}
117117

118-
rayDashboardClient := r.dashboardClientFunc(rayClusterInstance, rayJobInstance.Status.DashboardURL)
118+
rayDashboardClient, err := r.dashboardClientFunc(rayClusterInstance, rayJobInstance.Status.DashboardURL)
119+
if err != nil {
120+
logger.Error(err, "Failed to get dashboard client for RayJob")
121+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
122+
}
119123
if err := rayDashboardClient.StopJob(ctx, rayJobInstance.Status.JobId); err != nil {
120124
logger.Error(err, "Failed to stop job for RayJob")
121125
}
@@ -257,7 +261,11 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
257261
}
258262

259263
// Check the current status of ray jobs
260-
rayDashboardClient := r.dashboardClientFunc(rayClusterInstance, rayJobInstance.Status.DashboardURL)
264+
rayDashboardClient, err := r.dashboardClientFunc(rayClusterInstance, rayJobInstance.Status.DashboardURL)
265+
if err != nil {
266+
logger.Error(err, "Failed to get dashboard client for RayJob")
267+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
268+
}
261269

262270
jobInfo, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId)
263271
if err != nil {

ray-operator/controllers/ray/rayservice_controller.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type RayServiceReconciler struct {
5252
// Cache value is map of RayCluster name to Serve application config.
5353
ServeConfigs *lru.Cache
5454
RayClusterDeletionTimestamps cmap.ConcurrentMap[string, time.Time]
55-
dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) utils.RayDashboardClientInterface
55+
dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (utils.RayDashboardClientInterface, error)
5656
httpProxyClientFunc func() utils.RayHttpProxyClientInterface
5757
}
5858

@@ -943,7 +943,10 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
943943
return false, serveApplications, err
944944
}
945945

946-
rayDashboardClient := r.dashboardClientFunc(rayClusterInstance, clientURL)
946+
rayDashboardClient, err := r.dashboardClientFunc(rayClusterInstance, clientURL)
947+
if err != nil {
948+
return false, serveApplications, err
949+
}
947950

948951
cachedServeConfigV2 := r.getServeConfigFromCache(rayServiceInstance, rayClusterInstance.Name)
949952
isReady, serveApplications, err := getAndCheckServeStatus(ctx, rayDashboardClient)

ray-operator/controllers/ray/suite_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ var (
5151

5252
type TestClientProvider struct{}
5353

54-
func (testProvider TestClientProvider) GetDashboardClient(_ manager.Manager) func(rayCluster *rayv1.RayCluster, url string) utils.RayDashboardClientInterface {
55-
return func(_ *rayv1.RayCluster, _ string) utils.RayDashboardClientInterface {
56-
return fakeRayDashboardClient
54+
func (testProvider TestClientProvider) GetDashboardClient(_ manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (utils.RayDashboardClientInterface, error) {
55+
return func(_ *rayv1.RayCluster, _ string) (utils.RayDashboardClientInterface, error) {
56+
return fakeRayDashboardClient, nil
5757
}
5858
}
5959

ray-operator/controllers/ray/utils/dashboard_httpclient.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99

1010
"k8s.io/apimachinery/pkg/api/errors"
1111
"k8s.io/apimachinery/pkg/util/json"
12+
"k8s.io/apimachinery/pkg/util/yaml"
13+
ctrl "sigs.k8s.io/controller-runtime"
1214

1315
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
1416
)
@@ -29,7 +31,7 @@ type RayDashboardClientInterface interface {
2931
GetJobInfo(ctx context.Context, jobId string) (*RayJobInfo, error)
3032
ListJobs(ctx context.Context) (*[]RayJobInfo, error)
3133
SubmitJob(ctx context.Context, rayJob *rayv1.RayJob) (string, error)
32-
SubmitJobReq(ctx context.Context, request *RayJobRequest) (string, error)
34+
SubmitJobReq(ctx context.Context, request *RayJobRequest, name *string) (string, error)
3335
GetJobLog(ctx context.Context, jobName string) (*string, error)
3436
StopJob(ctx context.Context, jobName string) error
3537
DeleteJob(ctx context.Context, jobName string) error
@@ -230,14 +232,18 @@ func (r *RayDashboardClient) SubmitJob(ctx context.Context, rayJob *rayv1.RayJob
230232
if err != nil {
231233
return "", err
232234
}
233-
return r.SubmitJobReq(ctx, request)
235+
return r.SubmitJobReq(ctx, request, &rayJob.Name)
234236
}
235237

236-
func (r *RayDashboardClient) SubmitJobReq(ctx context.Context, request *RayJobRequest) (jobId string, err error) {
238+
func (r *RayDashboardClient) SubmitJobReq(ctx context.Context, request *RayJobRequest, name *string) (jobId string, err error) {
239+
log := ctrl.LoggerFrom(ctx)
237240
rayJobJson, err := json.Marshal(request)
238241
if err != nil {
239242
return
240243
}
244+
if name != nil {
245+
log.Info("Submit a ray job", "rayJob", name, "jobInfo", string(rayJobJson))
246+
}
241247

242248
req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.dashboardURL+JobPath, bytes.NewBuffer(rayJobJson))
243249
if err != nil {
@@ -271,6 +277,9 @@ func (r *RayDashboardClient) SubmitJobReq(ctx context.Context, request *RayJobRe
271277

272278
// Get Job Log
273279
func (r *RayDashboardClient) GetJobLog(ctx context.Context, jobName string) (*string, error) {
280+
log := ctrl.LoggerFrom(ctx)
281+
log.Info("Get ray job log", "rayJob", jobName)
282+
274283
req, err := http.NewRequestWithContext(ctx, http.MethodGet, r.dashboardURL+JobPath+jobName+"/logs", nil)
275284
if err != nil {
276285
return nil, err
@@ -302,6 +311,9 @@ func (r *RayDashboardClient) GetJobLog(ctx context.Context, jobName string) (*st
302311
}
303312

304313
func (r *RayDashboardClient) StopJob(ctx context.Context, jobName string) (err error) {
314+
log := ctrl.LoggerFrom(ctx)
315+
log.Info("Stop a ray job", "rayJob", jobName)
316+
305317
req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.dashboardURL+JobPath+jobName+"/stop", nil)
306318
if err != nil {
307319
return err
@@ -338,6 +350,9 @@ func (r *RayDashboardClient) StopJob(ctx context.Context, jobName string) (err e
338350
}
339351

340352
func (r *RayDashboardClient) DeleteJob(ctx context.Context, jobName string) error {
353+
log := ctrl.LoggerFrom(ctx)
354+
log.Info("Delete a ray job", "rayJob", jobName)
355+
341356
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, r.dashboardURL+JobPath+jobName, nil)
342357
if err != nil {
343358
return err
@@ -375,3 +390,11 @@ func ConvertRayJobToReq(rayJob *rayv1.RayJob) (*RayJobRequest, error) {
375390
}
376391
return req, nil
377392
}
393+
394+
func UnmarshalRuntimeEnvYAML(runtimeEnvYAML string) (RuntimeEnvType, error) {
395+
var runtimeEnv RuntimeEnvType
396+
if err := yaml.Unmarshal([]byte(runtimeEnvYAML), &runtimeEnv); err != nil {
397+
return nil, fmt.Errorf("failed to unmarshal RuntimeEnvYAML: %v: %w", runtimeEnvYAML, err)
398+
}
399+
return runtimeEnv, nil
400+
}

ray-operator/controllers/ray/utils/fake_serve_httpclient.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (r *FakeRayDashboardClient) SubmitJob(_ context.Context, _ *rayv1.RayJob) (
5555
return "", nil
5656
}
5757

58-
func (r *FakeRayDashboardClient) SubmitJobReq(_ context.Context, _ *RayJobRequest) (string, error) {
58+
func (r *FakeRayDashboardClient) SubmitJobReq(_ context.Context, _ *RayJobRequest, _ *string) (string, error) {
5959
return "", nil
6060
}
6161

ray-operator/controllers/ray/utils/util.go

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2121
"k8s.io/apimachinery/pkg/util/json"
2222
"k8s.io/apimachinery/pkg/util/rand"
23-
"k8s.io/apimachinery/pkg/util/yaml"
2423
"k8s.io/client-go/discovery"
2524
ctrl "sigs.k8s.io/controller-runtime"
2625
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -641,7 +640,7 @@ func EnvVarByName(envName string, envVars []corev1.EnvVar) (corev1.EnvVar, bool)
641640
}
642641

643642
type ClientProvider interface {
644-
GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) RayDashboardClientInterface
643+
GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (RayDashboardClientInterface, error)
645644
GetHttpProxyClient(mgr manager.Manager) func() RayHttpProxyClientInterface
646645
}
647646

@@ -758,28 +757,28 @@ func FetchHeadServiceURL(ctx context.Context, cli client.Client, rayCluster *ray
758757
return headServiceURL, nil
759758
}
760759

761-
func UnmarshalRuntimeEnvYAML(runtimeEnvYAML string) (RuntimeEnvType, error) {
762-
var runtimeEnv RuntimeEnvType
763-
if err := yaml.Unmarshal([]byte(runtimeEnvYAML), &runtimeEnv); err != nil {
764-
return nil, fmt.Errorf("failed to unmarshal RuntimeEnvYAML: %v: %w", runtimeEnvYAML, err)
765-
}
766-
return runtimeEnv, nil
767-
}
768-
769-
func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) func(rayCluster *rayv1.RayCluster, url string) RayDashboardClientInterface {
770-
return func(rayCluster *rayv1.RayCluster, url string) RayDashboardClientInterface {
760+
func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) func(rayCluster *rayv1.RayCluster, url string) (RayDashboardClientInterface, error) {
761+
return func(rayCluster *rayv1.RayCluster, url string) (RayDashboardClientInterface, error) {
771762
if useKubernetesProxy {
763+
var err error
764+
headSvcName := rayCluster.Status.Head.ServiceName
765+
if headSvcName == "" {
766+
headSvcName, err = GenerateHeadServiceName(RayClusterCRD, rayCluster.Spec, rayCluster.Name)
767+
if err != nil {
768+
return nil, err
769+
}
770+
}
772771
return &RayDashboardClient{
773772
client: mgr.GetHTTPClient(),
774-
dashboardURL: fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:dashboard/proxy", mgr.GetConfig().Host, rayCluster.Namespace, rayCluster.Status.Head.ServiceName),
775-
}
773+
dashboardURL: fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:dashboard/proxy", mgr.GetConfig().Host, rayCluster.Namespace, headSvcName),
774+
}, nil
776775
}
777776

778777
return &RayDashboardClient{
779778
client: &http.Client{
780779
Timeout: 2 * time.Second,
781780
},
782781
dashboardURL: "http://" + url,
783-
}
782+
}, nil
784783
}
785784
}

ray-operator/test/sampleyaml/support.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func QueryDashboardGetAppStatus(t Test, rayCluster *rayv1.RayCluster) func(Gomeg
7676
g.Expect(err).ToNot(HaveOccurred())
7777
url := fmt.Sprintf("127.0.0.1:%d", localPort)
7878
rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false)
79-
rayDashboardClient := rayDashboardClientFunc(rayCluster, url)
79+
rayDashboardClient, err := rayDashboardClientFunc(rayCluster, url)
8080
g.Expect(err).ToNot(HaveOccurred())
8181
serveDetails, err := rayDashboardClient.GetServeDetails(t.Ctx())
8282
g.Expect(err).ToNot(HaveOccurred())

0 commit comments

Comments
 (0)