Skip to content

Commit

Permalink
Add support for displaying the Ray dashboard when a RayJob is active
Browse files Browse the repository at this point in the history
Signed-off-by: Jeev B <[email protected]>
  • Loading branch information
jeevb committed Nov 10, 2023
1 parent 65b68c5 commit e0fc41a
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 12 deletions.
12 changes: 7 additions & 5 deletions flyteplugins/go/tasks/plugins/k8s/ray/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
pluginsConfig "github.com/flyteorg/flyte/flyteplugins/go/tasks/config"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/logs"
pluginmachinery "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog"
"github.com/flyteorg/flyte/flytestdlib/config"
)

Expand Down Expand Up @@ -78,11 +79,12 @@ type Config struct {
DeprecatedNodeIPAddress string `json:"nodeIPAddress,omitempty" pflag:"-,DEPRECATED. Please use DefaultConfig.[HeadNode|WorkerNode].IPAddress"`

// Remote Ray Cluster Config
RemoteClusterConfig pluginmachinery.ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for ray jobs"`
Logs logs.LogConfig `json:"logs" pflag:"-,Log configuration for ray jobs"`
LogsSidecar *v1.Container `json:"logsSidecar" pflag:"-,Sidecar to inject into head pods for capturing ray job logs"`
Defaults DefaultConfig `json:"defaults" pflag:"-,Default configuration for ray jobs"`
EnableUsageStats bool `json:"enableUsageStats" pflag:",Enable usage stats for ray jobs. These stats are submitted to usage-stats.ray.io per https://docs.ray.io/en/latest/cluster/usage-stats.html"`
RemoteClusterConfig pluginmachinery.ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for ray jobs"`
Logs logs.LogConfig `json:"logs" pflag:"-,Log configuration for ray jobs"`
LogsSidecar *v1.Container `json:"logsSidecar" pflag:"-,Sidecar to inject into head pods for capturing ray job logs"`
DashboardUrlTemplate *tasklog.TemplateLogPlugin `json:"dashboardUrlTemplate" pflag:",Template for URL of Ray dashboard running on a head node."`
Defaults DefaultConfig `json:"defaults" pflag:"-,Default configuration for ray jobs"`
EnableUsageStats bool `json:"enableUsageStats" pflag:",Enable usage stats for ray jobs. These stats are submitted to usage-stats.ray.io per https://docs.ray.io/en/latest/cluster/usage-stats.html"`
}

type DefaultConfig struct {
Expand Down
28 changes: 21 additions & 7 deletions flyteplugins/go/tasks/plugins/k8s/ray/ray.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/plugins"
flyteerr "github.com/flyteorg/flyte/flyteplugins/go/tasks/errors"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/logs"
Expand Down Expand Up @@ -441,22 +442,35 @@ func getEventInfoForRayJob(logConfig logs.LogConfig, pluginContext k8s.PluginCon
return nil, nil
}

// TODO: Retrieve the name of head pod from rayJob.status, and add it to task logs
// RayJob CRD does not include the name of the worker or head pod for now
var taskLogs []*core.TaskLog

Check warning on line 445 in flyteplugins/go/tasks/plugins/k8s/ray/ray.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/ray/ray.go#L445

Added line #L445 was not covered by tests

taskExecID := pluginContext.TaskExecutionMetadata().GetTaskExecutionID()
logOutput, err := logPlugin.GetTaskLogs(tasklog.Input{
input := tasklog.Input{

Check warning on line 448 in flyteplugins/go/tasks/plugins/k8s/ray/ray.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/ray/ray.go#L448

Added line #L448 was not covered by tests
Namespace: rayJob.Namespace,
TaskExecutionID: taskExecID,
})
}

Check warning on line 451 in flyteplugins/go/tasks/plugins/k8s/ray/ray.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/ray/ray.go#L451

Added line #L451 was not covered by tests

// TODO: Retrieve the name of head pod from rayJob.status, and add it to task logs
// RayJob CRD does not include the name of the worker or head pod for now
logOutput, err := logPlugin.GetTaskLogs(input)

Check warning on line 455 in flyteplugins/go/tasks/plugins/k8s/ray/ray.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/ray/ray.go#L453-L455

Added lines #L453 - L455 were not covered by tests
if err != nil {
return nil, fmt.Errorf("failed to generate task logs. Error: %w", err)
}
taskLogs = append(taskLogs, logOutput.TaskLogs...)

Check warning on line 459 in flyteplugins/go/tasks/plugins/k8s/ray/ray.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/ray/ray.go#L459

Added line #L459 was not covered by tests

return &pluginsCore.TaskInfo{
Logs: logOutput.TaskLogs,
}, nil
// Handling for Ray Dashboard
dashboardUrlTemplate := GetConfig().DashboardUrlTemplate
if dashboardUrlTemplate != nil &&
rayJob.Status.JobDeploymentStatus == rayv1alpha1.JobDeploymentStatusRunning &&
rayJob.Status.DashboardURL != "" {
dashboardUrlOutput, err := dashboardUrlTemplate.GetTaskLogs(input)
if err != nil {
return nil, fmt.Errorf("failed to generate Ray dashboard link. Error: %w", err)
}
taskLogs = append(taskLogs, dashboardUrlOutput.TaskLogs...)

Check warning on line 470 in flyteplugins/go/tasks/plugins/k8s/ray/ray.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/ray/ray.go#L461-L470

Added lines #L461 - L470 were not covered by tests
}

return &pluginsCore.TaskInfo{Logs: logOutput.TaskLogs}, nil

Check warning on line 473 in flyteplugins/go/tasks/plugins/k8s/ray/ray.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/ray/ray.go#L473

Added line #L473 was not covered by tests
}

func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContext, resource client.Object) (pluginsCore.PhaseInfo, error) {
Expand Down

0 comments on commit e0fc41a

Please sign in to comment.