Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Add ray dashboard log link" #4226

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions flyteplugins/go/tasks/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,15 @@ var (
rootSection = config.MustRegisterSection(configSectionKey, &Config{})
)

// Config is the top level plugins config.
// Top level plugins config.
type Config struct {
}

// GetConfig retrieves the current config value or default.
// Retrieves the current config value or default.
func GetConfig() *Config {
return rootSection.GetConfig().(*Config)
}

func MustRegisterSubSection(subSectionKey string, section config.Config) config.Section {
return rootSection.MustRegisterSection(subSectionKey, section)
}

func MustRegisterSubSectionWithUpdates(subSectionKey string, section config.Config, sectionUpdatedFn config.SectionUpdated) config.Section {
return rootSection.MustRegisterSectionWithUpdates(subSectionKey, section, sectionUpdatedFn)
}
4 changes: 2 additions & 2 deletions flyteplugins/go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func phaseInfo(p Phase, v uint32, err *core.ExecutionError, info *TaskInfo, clea
}
}

// PhaseInfoNotReady represents the case the plugin is not ready to start
// Return in the case the plugin is not ready to start
func PhaseInfoNotReady(t time.Time, version uint32, reason string) PhaseInfo {
pi := phaseInfo(PhaseNotReady, version, nil, &TaskInfo{OccurredAt: &t}, false)
pi.reason = reason
Expand All @@ -206,7 +206,7 @@ func PhaseInfoWaitingForResources(t time.Time, version uint32, reason string) Ph
return pi
}

// PhaseInfoWaitingForResourcesInfo represents the case the plugin is not ready to start
// Return in the case the plugin is not ready to start
func PhaseInfoWaitingForResourcesInfo(t time.Time, version uint32, reason string, info *TaskInfo) PhaseInfo {
pi := phaseInfo(PhaseWaitingForResources, version, nil, info, false)
pi.reason = reason
Expand Down
54 changes: 4 additions & 50 deletions flyteplugins/go/tasks/plugins/k8s/ray/config.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package ray

import (
"context"

pluginsConfig "github.com/flyteorg/flyte/flyteplugins/go/tasks/config"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/logs"
pluginmachinery "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s"
"github.com/flyteorg/flyte/flytestdlib/config"
)

//go:generate pflags Config --default-var=defaultConfig
Expand All @@ -18,39 +14,10 @@ var (
ServiceType: "NodePort",
IncludeDashboard: true,
DashboardHost: "0.0.0.0",
EnableUsageStats: false,
Defaults: DefaultConfig{
HeadNode: NodeConfig{
StartParameters: map[string]string{
// Disable usage reporting by default: https://docs.ray.io/en/latest/cluster/usage-stats.html
DisableUsageStatsStartParameter: "true",
},
IPAddress: "$MY_POD_IP",
},
WorkerNode: NodeConfig{
StartParameters: map[string]string{
// Disable usage reporting by default: https://docs.ray.io/en/latest/cluster/usage-stats.html
DisableUsageStatsStartParameter: "true",
},
IPAddress: "$MY_POD_IP",
},
},
NodeIPAddress: "$MY_POD_IP",
}

configSection = pluginsConfig.MustRegisterSubSectionWithUpdates("ray", &defaultConfig,
func(ctx context.Context, newValue config.Config) {
if newValue == nil {
return
}

if len(newValue.(*Config).Defaults.HeadNode.IPAddress) == 0 {
newValue.(*Config).Defaults.HeadNode.IPAddress = newValue.(*Config).DeprecatedNodeIPAddress
}

if len(newValue.(*Config).Defaults.WorkerNode.IPAddress) == 0 {
newValue.(*Config).Defaults.WorkerNode.IPAddress = newValue.(*Config).DeprecatedNodeIPAddress
}
})
configSection = pluginsConfig.MustRegisterSubSection("ray", &defaultConfig)
)

// Config is config for 'ray' plugin
Expand All @@ -72,24 +39,11 @@ type Config struct {
// or 0.0.0.0 (available from all interfaces). By default, this is localhost.
DashboardHost string `json:"dashboardHost,omitempty"`

// DeprecatedNodeIPAddress the IP address of the head node. By default, this is pod ip address.
DeprecatedNodeIPAddress string `json:"nodeIPAddress,omitempty" pflag:"-,DEPRECATED. Please use DefaultConfig.[HeadNode|WorkerNode].IPAddress"`
// NodeIPAddress the IP address of the head node. By default, this is pod ip address.
NodeIPAddress string `json:"nodeIPAddress,omitempty"`

// Remote Ray Cluster Config
RemoteClusterConfig pluginmachinery.ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for ray jobs"`
Logs logs.LogConfig `json:"logs" pflag:"-,Log configuration for ray jobs"`
Defaults DefaultConfig `json:"defaults" pflag:"-,Default configuration for ray jobs"`
EnableUsageStats bool `json:"enableUsageStats" pflag:",Enable usage stats for ray jobs. These stats are submitted to usage-stats.ray.io per https://docs.ray.io/en/latest/cluster/usage-stats.html"`
}

type DefaultConfig struct {
HeadNode NodeConfig `json:"headNode,omitempty" pflag:"-,Default configuration for head node of ray jobs"`
WorkerNode NodeConfig `json:"workerNode,omitempty" pflag:"-,Default configuration for worker node of ray jobs"`
}

type NodeConfig struct {
StartParameters map[string]string `json:"startParameters,omitempty" pflag:"-,Start parameters for the node"`
IPAddress string `json:"ipAddress,omitempty" pflag:"-,IP address of the node"`
}

func GetConfig() *Config {
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/k8s/ray/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 14 additions & 14 deletions flyteplugins/go/tasks/plugins/k8s/ray/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 25 additions & 50 deletions flyteplugins/go/tasks/plugins/k8s/ray/ray.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/plugins"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog"

"github.com/flyteorg/flyte/flyteplugins/go/tasks/logs"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery"
Expand All @@ -27,12 +28,11 @@ import (
)

const (
rayTaskType = "ray"
KindRayJob = "RayJob"
IncludeDashboard = "include-dashboard"
NodeIPAddress = "node-ip-address"
DashboardHost = "dashboard-host"
DisableUsageStatsStartParameter = "disable-usage-stats"
rayTaskType = "ray"
KindRayJob = "RayJob"
IncludeDashboard = "include-dashboard"
NodeIPAddress = "node-ip-address"
DashboardHost = "dashboard-host"
)

type rayJobResourceHandler struct {
Expand All @@ -58,6 +58,7 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC
}

podSpec, objectMeta, primaryContainerName, err := flytek8s.ToK8sPodSpec(ctx, taskCtx)

if err != nil {
return nil, flyteerr.Errorf(flyteerr.BadTaskSpecification, "Unable to create pod spec: [%v]", err.Error())
}
Expand All @@ -76,36 +77,26 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC
return nil, flyteerr.Errorf(flyteerr.BadTaskSpecification, "Unable to get primary container from the pod: [%v]", err.Error())
}

cfg := GetConfig()
headReplicas := int32(1)
headNodeRayStartParams := make(map[string]string)
if rayJob.RayCluster.HeadGroupSpec != nil && rayJob.RayCluster.HeadGroupSpec.RayStartParams != nil {
headNodeRayStartParams = rayJob.RayCluster.HeadGroupSpec.RayStartParams
} else if headNode := cfg.Defaults.HeadNode; len(headNode.StartParameters) > 0 {
headNodeRayStartParams = headNode.StartParameters
}

if _, exist := headNodeRayStartParams[IncludeDashboard]; !exist {
headNodeRayStartParams[IncludeDashboard] = strconv.FormatBool(GetConfig().IncludeDashboard)
}

if _, exist := headNodeRayStartParams[NodeIPAddress]; !exist {
headNodeRayStartParams[NodeIPAddress] = cfg.Defaults.HeadNode.IPAddress
headNodeRayStartParams[NodeIPAddress] = GetConfig().NodeIPAddress
}

if _, exist := headNodeRayStartParams[DashboardHost]; !exist {
headNodeRayStartParams[DashboardHost] = cfg.DashboardHost
}

if _, exists := headNodeRayStartParams[DisableUsageStatsStartParameter]; !exists && !cfg.EnableUsageStats {
headNodeRayStartParams[DisableUsageStatsStartParameter] = "true"
headNodeRayStartParams[DashboardHost] = GetConfig().DashboardHost
}

enableIngress := true
rayClusterSpec := rayv1alpha1.RayClusterSpec{
HeadGroupSpec: rayv1alpha1.HeadGroupSpec{
Template: buildHeadPodTemplate(&container, podSpec, objectMeta, taskCtx),
ServiceType: v1.ServiceType(cfg.ServiceType),
ServiceType: v1.ServiceType(GetConfig().ServiceType),
Replicas: &headReplicas,
EnableIngress: &enableIngress,
RayStartParams: headNodeRayStartParams,
Expand All @@ -121,24 +112,16 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC
if spec.MinReplicas != 0 {
minReplicas = spec.MinReplicas
}

if spec.MaxReplicas != 0 {
maxReplicas = spec.MaxReplicas
}

workerNodeRayStartParams := make(map[string]string)
if spec.RayStartParams != nil {
workerNodeRayStartParams = spec.RayStartParams
} else if workerNode := cfg.Defaults.WorkerNode; len(workerNode.StartParameters) > 0 {
workerNodeRayStartParams = workerNode.StartParameters
}

if _, exist := workerNodeRayStartParams[NodeIPAddress]; !exist {
workerNodeRayStartParams[NodeIPAddress] = cfg.Defaults.WorkerNode.IPAddress
}

if _, exists := workerNodeRayStartParams[DisableUsageStatsStartParameter]; !exists && !cfg.EnableUsageStats {
workerNodeRayStartParams[DisableUsageStatsStartParameter] = "true"
workerNodeRayStartParams[NodeIPAddress] = GetConfig().NodeIPAddress
}

workerNodeSpec := rayv1alpha1.WorkerGroupSpec{
Expand All @@ -163,8 +146,8 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC
jobSpec := rayv1alpha1.RayJobSpec{
RayClusterSpec: rayClusterSpec,
Entrypoint: strings.Join(container.Args, " "),
ShutdownAfterJobFinishes: cfg.ShutdownAfterJobFinishes,
TTLSecondsAfterFinished: &cfg.TTLSecondsAfterFinished,
ShutdownAfterJobFinishes: GetConfig().ShutdownAfterJobFinishes,
TTLSecondsAfterFinished: &GetConfig().TTLSecondsAfterFinished,
RuntimeEnv: rayJob.RuntimeEnv,
}

Expand Down Expand Up @@ -365,10 +348,12 @@ func (rayJobResourceHandler) BuildIdentityResource(ctx context.Context, taskCtx
}, nil
}

func getEventInfoForRayJob(logConfig logs.LogConfig, pluginContext k8s.PluginContext, rayJob *rayv1alpha1.RayJob) (*pluginsCore.TaskInfo, error) {
logPlugin, err := logs.InitializeLogPlugins(&logConfig)
func getEventInfoForRayJob() (*pluginsCore.TaskInfo, error) {
taskLogs := make([]*core.TaskLog, 0, 3)
logPlugin, err := logs.InitializeLogPlugins(logs.GetLogConfig())

if err != nil {
return nil, fmt.Errorf("failed to initialize log plugins. Error: %w", err)
return nil, err
}

if logPlugin == nil {
Expand All @@ -378,31 +363,22 @@ func getEventInfoForRayJob(logConfig logs.LogConfig, pluginContext k8s.PluginCon
// TODO: Retrieve the name of head pod from rayJob.status, and add it to task logs
// RayJob CRD does not include the name of the worker or head pod for now

taskID := pluginContext.TaskExecutionMetadata().GetTaskExecutionID().GetID()
logOutput, err := logPlugin.GetTaskLogs(tasklog.Input{
Namespace: rayJob.Namespace,
TaskExecutionIdentifier: &taskID,
})

if err != nil {
return nil, fmt.Errorf("failed to generate task logs. Error: %w", err)
}
// TODO: Add ray Dashboard URI to task logs

return &pluginsCore.TaskInfo{
Logs: logOutput.TaskLogs,
Logs: taskLogs,
}, nil
}

func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContext, resource client.Object) (pluginsCore.PhaseInfo, error) {
func (rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContext, resource client.Object) (pluginsCore.PhaseInfo, error) {
rayJob := resource.(*rayv1alpha1.RayJob)
info, err := getEventInfoForRayJob(GetConfig().Logs, pluginContext, rayJob)
info, err := getEventInfoForRayJob()
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
}

switch rayJob.Status.JobStatus {
case rayv1alpha1.JobStatusPending:
return pluginsCore.PhaseInfoInitializing(rayJob.Status.StartTime.Time, pluginsCore.DefaultPhaseVersion, "job is pending", info), nil
return pluginsCore.PhaseInfoNotReady(time.Now(), pluginsCore.DefaultPhaseVersion, "job is pending"), nil
case rayv1alpha1.JobStatusFailed:
reason := fmt.Sprintf("Failed to create Ray job: %s", rayJob.Name)
return pluginsCore.PhaseInfoFailure(flyteerr.TaskFailedWithError, reason, info), nil
Expand All @@ -411,8 +387,7 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont
case rayv1alpha1.JobStatusRunning:
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info), nil
}

return pluginsCore.PhaseInfoQueued(rayJob.CreationTimestamp.Time, pluginsCore.DefaultPhaseVersion, "JobCreated"), nil
return pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "JobCreated"), nil
}

func init() {
Expand Down
Loading
Loading