Skip to content

Commit

Permalink
Add support for displaying the Ray dashboard when a RayJob is active
Browse files Browse the repository at this point in the history
Signed-off-by: Jeev B <[email protected]>
  • Loading branch information
jeevb committed Nov 10, 2023
1 parent 65b68c5 commit aad9450
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 14 deletions.
12 changes: 7 additions & 5 deletions flyteplugins/go/tasks/plugins/k8s/ray/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
pluginsConfig "github.com/flyteorg/flyte/flyteplugins/go/tasks/config"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/logs"
pluginmachinery "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog"
"github.com/flyteorg/flyte/flytestdlib/config"
)

Expand Down Expand Up @@ -78,11 +79,12 @@ type Config struct {
DeprecatedNodeIPAddress string `json:"nodeIPAddress,omitempty" pflag:"-,DEPRECATED. Please use DefaultConfig.[HeadNode|WorkerNode].IPAddress"`

// Remote Ray Cluster Config
RemoteClusterConfig pluginmachinery.ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for ray jobs"`
Logs logs.LogConfig `json:"logs" pflag:"-,Log configuration for ray jobs"`
LogsSidecar *v1.Container `json:"logsSidecar" pflag:"-,Sidecar to inject into head pods for capturing ray job logs"`
Defaults DefaultConfig `json:"defaults" pflag:"-,Default configuration for ray jobs"`
EnableUsageStats bool `json:"enableUsageStats" pflag:",Enable usage stats for ray jobs. These stats are submitted to usage-stats.ray.io per https://docs.ray.io/en/latest/cluster/usage-stats.html"`
RemoteClusterConfig pluginmachinery.ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for ray jobs"`
Logs logs.LogConfig `json:"logs" pflag:"-,Log configuration for ray jobs"`
LogsSidecar *v1.Container `json:"logsSidecar" pflag:"-,Sidecar to inject into head pods for capturing ray job logs"`
DashboardUrlTemplate *tasklog.TemplateLogPlugin `json:"dashboardUrlTemplate" pflag:",Template for URL of Ray dashboard running on a head node."`
Defaults DefaultConfig `json:"defaults" pflag:"-,Default configuration for ray jobs"`
EnableUsageStats bool `json:"enableUsageStats" pflag:",Enable usage stats for ray jobs. These stats are submitted to usage-stats.ray.io per https://docs.ray.io/en/latest/cluster/usage-stats.html"`
}

type DefaultConfig struct {
Expand Down
30 changes: 21 additions & 9 deletions flyteplugins/go/tasks/plugins/k8s/ray/ray.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,22 +441,34 @@ func getEventInfoForRayJob(logConfig logs.LogConfig, pluginContext k8s.PluginCon
return nil, nil
}

// TODO: Retrieve the name of head pod from rayJob.status, and add it to task logs
// RayJob CRD does not include the name of the worker or head pod for now

taskExecID := pluginContext.TaskExecutionMetadata().GetTaskExecutionID()
logOutput, err := logPlugin.GetTaskLogs(tasklog.Input{
var taskLogs []*core.TaskLog

Check failure on line 444 in flyteplugins/go/tasks/plugins/k8s/ray/ray.go

View workflow job for this annotation

GitHub Actions / compile

undefined: core
input := tasklog.Input{
Namespace: rayJob.Namespace,
TaskExecutionID: taskExecID,

Check failure on line 447 in flyteplugins/go/tasks/plugins/k8s/ray/ray.go

View workflow job for this annotation

GitHub Actions / compile

undefined: taskExecID
})
}

// TODO: Retrieve the name of head pod from rayJob.status, and add it to task logs
// RayJob CRD does not include the name of the worker or head pod for now
taskExecID := pluginContext.TaskExecutionMetadata().GetTaskExecutionID()

Check failure on line 452 in flyteplugins/go/tasks/plugins/k8s/ray/ray.go

View workflow job for this annotation

GitHub Actions / compile

taskExecID declared but not used
logOutput, err := logPlugin.GetTaskLogs(input)
if err != nil {
return nil, fmt.Errorf("failed to generate task logs. Error: %w", err)
}
taskLogs = append(taskLogs, logOutput.TaskLogs...)

return &pluginsCore.TaskInfo{
Logs: logOutput.TaskLogs,
}, nil
// Handling for Ray Dashboard
dashboardUrlTemplate := GetConfig().DashboardUrlTemplate
if dashboardUrlTemplate != nil &&
rayJob.Status.JobDeploymentStatus == rayv1alpha1.JobDeploymentStatusRunning &&
rayJob.Status.DashboardURL != "" {
dashboardUrlOutput, err := dashboardUrlTemplate.GetTaskLogs(input)
if err != nil {
return nil, fmt.Errorf("failed to generate Ray dashboard link. Error: %w", err)
}
taskLogs = append(taskLogs, dashboardUrlOutput.TaskLogs...)
}

return &pluginsCore.TaskInfo{Logs: logOutput.TaskLogs}, nil
}

func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContext, resource client.Object) (pluginsCore.PhaseInfo, error) {
Expand Down

0 comments on commit aad9450

Please sign in to comment.