Skip to content

Commit

Permalink
Revert "Add ray dashboard log link"
Browse files Browse the repository at this point in the history
Signed-off-by: Jeev B <[email protected]>
  • Loading branch information
jeevb committed Oct 13, 2023
1 parent c6476cc commit 45a85d5
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 193 deletions.
8 changes: 2 additions & 6 deletions flyteplugins/go/tasks/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,15 @@ var (
rootSection = config.MustRegisterSection(configSectionKey, &Config{})
)

// Config is the top level plugins config.
// Top level plugins config.
type Config struct {
}

// GetConfig retrieves the current config value or default.
// Retrieves the current config value or default.
func GetConfig() *Config {
return rootSection.GetConfig().(*Config)
}

func MustRegisterSubSection(subSectionKey string, section config.Config) config.Section {
return rootSection.MustRegisterSection(subSectionKey, section)
}

func MustRegisterSubSectionWithUpdates(subSectionKey string, section config.Config, sectionUpdatedFn config.SectionUpdated) config.Section {
return rootSection.MustRegisterSectionWithUpdates(subSectionKey, section, sectionUpdatedFn)
}
4 changes: 2 additions & 2 deletions flyteplugins/go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func phaseInfo(p Phase, v uint32, err *core.ExecutionError, info *TaskInfo, clea
}
}

// PhaseInfoNotReady represents the case the plugin is not ready to start
// Return in the case the plugin is not ready to start
func PhaseInfoNotReady(t time.Time, version uint32, reason string) PhaseInfo {
pi := phaseInfo(PhaseNotReady, version, nil, &TaskInfo{OccurredAt: &t}, false)
pi.reason = reason
Expand All @@ -206,7 +206,7 @@ func PhaseInfoWaitingForResources(t time.Time, version uint32, reason string) Ph
return pi
}

// PhaseInfoWaitingForResourcesInfo represents the case the plugin is not ready to start
// Return in the case the plugin is not ready to start
func PhaseInfoWaitingForResourcesInfo(t time.Time, version uint32, reason string, info *TaskInfo) PhaseInfo {
pi := phaseInfo(PhaseWaitingForResources, version, nil, info, false)
pi.reason = reason
Expand Down
54 changes: 4 additions & 50 deletions flyteplugins/go/tasks/plugins/k8s/ray/config.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package ray

import (
"context"

pluginsConfig "github.com/flyteorg/flyte/flyteplugins/go/tasks/config"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/logs"
pluginmachinery "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s"
"github.com/flyteorg/flyte/flytestdlib/config"
)

//go:generate pflags Config --default-var=defaultConfig
Expand All @@ -18,39 +14,10 @@ var (
ServiceType: "NodePort",
IncludeDashboard: true,
DashboardHost: "0.0.0.0",
EnableUsageStats: false,
Defaults: DefaultConfig{
HeadNode: NodeConfig{
StartParameters: map[string]string{
// Disable usage reporting by default: https://docs.ray.io/en/latest/cluster/usage-stats.html
DisableUsageStatsStartParameter: "true",
},
IPAddress: "$MY_POD_IP",
},
WorkerNode: NodeConfig{
StartParameters: map[string]string{
// Disable usage reporting by default: https://docs.ray.io/en/latest/cluster/usage-stats.html
DisableUsageStatsStartParameter: "true",
},
IPAddress: "$MY_POD_IP",
},
},
NodeIPAddress: "$MY_POD_IP",
}

configSection = pluginsConfig.MustRegisterSubSectionWithUpdates("ray", &defaultConfig,
func(ctx context.Context, newValue config.Config) {
if newValue == nil {
return
}

if len(newValue.(*Config).Defaults.HeadNode.IPAddress) == 0 {
newValue.(*Config).Defaults.HeadNode.IPAddress = newValue.(*Config).DeprecatedNodeIPAddress
}

if len(newValue.(*Config).Defaults.WorkerNode.IPAddress) == 0 {
newValue.(*Config).Defaults.WorkerNode.IPAddress = newValue.(*Config).DeprecatedNodeIPAddress
}
})
configSection = pluginsConfig.MustRegisterSubSection("ray", &defaultConfig)
)

// Config is config for 'ray' plugin
Expand All @@ -72,24 +39,11 @@ type Config struct {
// or 0.0.0.0 (available from all interfaces). By default, this is localhost.
DashboardHost string `json:"dashboardHost,omitempty"`

// DeprecatedNodeIPAddress the IP address of the head node. By default, this is pod ip address.
DeprecatedNodeIPAddress string `json:"nodeIPAddress,omitempty" pflag:"-,DEPRECATED. Please use DefaultConfig.[HeadNode|WorkerNode].IPAddress"`
// NodeIPAddress the IP address of the head node. By default, this is pod ip address.
NodeIPAddress string `json:"nodeIPAddress,omitempty"`

// Remote Ray Cluster Config
RemoteClusterConfig pluginmachinery.ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for ray jobs"`
Logs logs.LogConfig `json:"logs" pflag:"-,Log configuration for ray jobs"`
Defaults DefaultConfig `json:"defaults" pflag:"-,Default configuration for ray jobs"`
EnableUsageStats bool `json:"enableUsageStats" pflag:",Enable usage stats for ray jobs. These stats are submitted to usage-stats.ray.io per https://docs.ray.io/en/latest/cluster/usage-stats.html"`
}

type DefaultConfig struct {
HeadNode NodeConfig `json:"headNode,omitempty" pflag:"-,Default configuration for head node of ray jobs"`
WorkerNode NodeConfig `json:"workerNode,omitempty" pflag:"-,Default configuration for worker node of ray jobs"`
}

type NodeConfig struct {
StartParameters map[string]string `json:"startParameters,omitempty" pflag:"-,Start parameters for the node"`
IPAddress string `json:"ipAddress,omitempty" pflag:"-,IP address of the node"`
}

func GetConfig() *Config {
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/k8s/ray/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 14 additions & 14 deletions flyteplugins/go/tasks/plugins/k8s/ray/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 25 additions & 50 deletions flyteplugins/go/tasks/plugins/k8s/ray/ray.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/plugins"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog"

"github.com/flyteorg/flyte/flyteplugins/go/tasks/logs"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery"
Expand All @@ -27,12 +28,11 @@ import (
)

const (
rayTaskType = "ray"
KindRayJob = "RayJob"
IncludeDashboard = "include-dashboard"
NodeIPAddress = "node-ip-address"
DashboardHost = "dashboard-host"
DisableUsageStatsStartParameter = "disable-usage-stats"
rayTaskType = "ray"
KindRayJob = "RayJob"
IncludeDashboard = "include-dashboard"
NodeIPAddress = "node-ip-address"
DashboardHost = "dashboard-host"
)

type rayJobResourceHandler struct {
Expand All @@ -58,6 +58,7 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC
}

podSpec, objectMeta, primaryContainerName, err := flytek8s.ToK8sPodSpec(ctx, taskCtx)

if err != nil {
return nil, flyteerr.Errorf(flyteerr.BadTaskSpecification, "Unable to create pod spec: [%v]", err.Error())
}
Expand All @@ -76,36 +77,26 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC
return nil, flyteerr.Errorf(flyteerr.BadTaskSpecification, "Unable to get primary container from the pod: [%v]", err.Error())
}

cfg := GetConfig()
headReplicas := int32(1)
headNodeRayStartParams := make(map[string]string)
if rayJob.RayCluster.HeadGroupSpec != nil && rayJob.RayCluster.HeadGroupSpec.RayStartParams != nil {
headNodeRayStartParams = rayJob.RayCluster.HeadGroupSpec.RayStartParams
} else if headNode := cfg.Defaults.HeadNode; len(headNode.StartParameters) > 0 {
headNodeRayStartParams = headNode.StartParameters
}

if _, exist := headNodeRayStartParams[IncludeDashboard]; !exist {
headNodeRayStartParams[IncludeDashboard] = strconv.FormatBool(GetConfig().IncludeDashboard)
}

if _, exist := headNodeRayStartParams[NodeIPAddress]; !exist {
headNodeRayStartParams[NodeIPAddress] = cfg.Defaults.HeadNode.IPAddress
headNodeRayStartParams[NodeIPAddress] = GetConfig().NodeIPAddress
}

if _, exist := headNodeRayStartParams[DashboardHost]; !exist {
headNodeRayStartParams[DashboardHost] = cfg.DashboardHost
}

if _, exists := headNodeRayStartParams[DisableUsageStatsStartParameter]; !exists && !cfg.EnableUsageStats {
headNodeRayStartParams[DisableUsageStatsStartParameter] = "true"
headNodeRayStartParams[DashboardHost] = GetConfig().DashboardHost
}

enableIngress := true
rayClusterSpec := rayv1alpha1.RayClusterSpec{
HeadGroupSpec: rayv1alpha1.HeadGroupSpec{
Template: buildHeadPodTemplate(&container, podSpec, objectMeta, taskCtx),
ServiceType: v1.ServiceType(cfg.ServiceType),
ServiceType: v1.ServiceType(GetConfig().ServiceType),
Replicas: &headReplicas,
EnableIngress: &enableIngress,
RayStartParams: headNodeRayStartParams,
Expand All @@ -121,24 +112,16 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC
if spec.MinReplicas != 0 {
minReplicas = spec.MinReplicas
}

if spec.MaxReplicas != 0 {
maxReplicas = spec.MaxReplicas
}

workerNodeRayStartParams := make(map[string]string)
if spec.RayStartParams != nil {
workerNodeRayStartParams = spec.RayStartParams
} else if workerNode := cfg.Defaults.WorkerNode; len(workerNode.StartParameters) > 0 {
workerNodeRayStartParams = workerNode.StartParameters
}

if _, exist := workerNodeRayStartParams[NodeIPAddress]; !exist {
workerNodeRayStartParams[NodeIPAddress] = cfg.Defaults.WorkerNode.IPAddress
}

if _, exists := workerNodeRayStartParams[DisableUsageStatsStartParameter]; !exists && !cfg.EnableUsageStats {
workerNodeRayStartParams[DisableUsageStatsStartParameter] = "true"
workerNodeRayStartParams[NodeIPAddress] = GetConfig().NodeIPAddress
}

workerNodeSpec := rayv1alpha1.WorkerGroupSpec{
Expand All @@ -163,8 +146,8 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC
jobSpec := rayv1alpha1.RayJobSpec{
RayClusterSpec: rayClusterSpec,
Entrypoint: strings.Join(container.Args, " "),
ShutdownAfterJobFinishes: cfg.ShutdownAfterJobFinishes,
TTLSecondsAfterFinished: &cfg.TTLSecondsAfterFinished,
ShutdownAfterJobFinishes: GetConfig().ShutdownAfterJobFinishes,
TTLSecondsAfterFinished: &GetConfig().TTLSecondsAfterFinished,
RuntimeEnv: rayJob.RuntimeEnv,
}

Expand Down Expand Up @@ -365,10 +348,12 @@ func (rayJobResourceHandler) BuildIdentityResource(ctx context.Context, taskCtx
}, nil
}

func getEventInfoForRayJob(logConfig logs.LogConfig, pluginContext k8s.PluginContext, rayJob *rayv1alpha1.RayJob) (*pluginsCore.TaskInfo, error) {
logPlugin, err := logs.InitializeLogPlugins(&logConfig)
func getEventInfoForRayJob() (*pluginsCore.TaskInfo, error) {
taskLogs := make([]*core.TaskLog, 0, 3)
logPlugin, err := logs.InitializeLogPlugins(logs.GetLogConfig())

if err != nil {
return nil, fmt.Errorf("failed to initialize log plugins. Error: %w", err)
return nil, err
}

if logPlugin == nil {
Expand All @@ -378,31 +363,22 @@ func getEventInfoForRayJob(logConfig logs.LogConfig, pluginContext k8s.PluginCon
// TODO: Retrieve the name of head pod from rayJob.status, and add it to task logs
// RayJob CRD does not include the name of the worker or head pod for now

taskID := pluginContext.TaskExecutionMetadata().GetTaskExecutionID().GetID()
logOutput, err := logPlugin.GetTaskLogs(tasklog.Input{
Namespace: rayJob.Namespace,
TaskExecutionIdentifier: &taskID,
})

if err != nil {
return nil, fmt.Errorf("failed to generate task logs. Error: %w", err)
}
// TODO: Add ray Dashboard URI to task logs

return &pluginsCore.TaskInfo{
Logs: logOutput.TaskLogs,
Logs: taskLogs,
}, nil
}

func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContext, resource client.Object) (pluginsCore.PhaseInfo, error) {
func (rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContext, resource client.Object) (pluginsCore.PhaseInfo, error) {
rayJob := resource.(*rayv1alpha1.RayJob)
info, err := getEventInfoForRayJob(GetConfig().Logs, pluginContext, rayJob)
info, err := getEventInfoForRayJob()
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
}

switch rayJob.Status.JobStatus {
case rayv1alpha1.JobStatusPending:
return pluginsCore.PhaseInfoInitializing(rayJob.Status.StartTime.Time, pluginsCore.DefaultPhaseVersion, "job is pending", info), nil
return pluginsCore.PhaseInfoNotReady(time.Now(), pluginsCore.DefaultPhaseVersion, "job is pending"), nil
case rayv1alpha1.JobStatusFailed:
reason := fmt.Sprintf("Failed to create Ray job: %s", rayJob.Name)
return pluginsCore.PhaseInfoFailure(flyteerr.TaskFailedWithError, reason, info), nil
Expand All @@ -411,8 +387,7 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont
case rayv1alpha1.JobStatusRunning:
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info), nil
}

return pluginsCore.PhaseInfoQueued(rayJob.CreationTimestamp.Time, pluginsCore.DefaultPhaseVersion, "JobCreated"), nil
return pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "JobCreated"), nil
}

func init() {
Expand Down
Loading

0 comments on commit 45a85d5

Please sign in to comment.