From 45a85d5ee1b3a067900d4c9592ead4c6a48f6b37 Mon Sep 17 00:00:00 2001 From: Jeev B Date: Fri, 13 Oct 2023 16:54:32 -0700 Subject: [PATCH] Revert "Add ray dashboard log link" Signed-off-by: Jeev B --- flyteplugins/go/tasks/config/config.go | 8 +- .../go/tasks/pluginmachinery/core/phase.go | 4 +- .../go/tasks/plugins/k8s/ray/config.go | 54 +------------ .../go/tasks/plugins/k8s/ray/config_flags.go | 2 +- .../plugins/k8s/ray/config_flags_test.go | 28 +++---- flyteplugins/go/tasks/plugins/k8s/ray/ray.go | 75 +++++++------------ .../go/tasks/plugins/k8s/ray/ray_test.go | 72 +----------------- 7 files changed, 50 insertions(+), 193 deletions(-) diff --git a/flyteplugins/go/tasks/config/config.go b/flyteplugins/go/tasks/config/config.go index c366d4ecf2..1588090b87 100644 --- a/flyteplugins/go/tasks/config/config.go +++ b/flyteplugins/go/tasks/config/config.go @@ -12,11 +12,11 @@ var ( rootSection = config.MustRegisterSection(configSectionKey, &Config{}) ) -// Config is the top level plugins config. +// Top level plugins config. type Config struct { } -// GetConfig retrieves the current config value or default. +// Retrieves the current config value or default. func GetConfig() *Config { return rootSection.GetConfig().(*Config) } @@ -24,7 +24,3 @@ func GetConfig() *Config { func MustRegisterSubSection(subSectionKey string, section config.Config) config.Section { return rootSection.MustRegisterSection(subSectionKey, section) } - -func MustRegisterSubSectionWithUpdates(subSectionKey string, section config.Config, sectionUpdatedFn config.SectionUpdated) config.Section { - return rootSection.MustRegisterSectionWithUpdates(subSectionKey, section, sectionUpdatedFn) -} diff --git a/flyteplugins/go/tasks/pluginmachinery/core/phase.go b/flyteplugins/go/tasks/pluginmachinery/core/phase.go index 3f8c2a0914..3817f6dc21 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/phase.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/phase.go @@ -192,7 +192,7 @@ func phaseInfo(p Phase, v uint32, err *core.ExecutionError, info *TaskInfo, clea } } -// PhaseInfoNotReady represents the case the plugin is not ready to start +// Return in the case the plugin is not ready to start func PhaseInfoNotReady(t time.Time, version uint32, reason string) PhaseInfo { pi := phaseInfo(PhaseNotReady, version, nil, &TaskInfo{OccurredAt: &t}, false) pi.reason = reason @@ -206,7 +206,7 @@ func PhaseInfoWaitingForResources(t time.Time, version uint32, reason string) Ph return pi } -// PhaseInfoWaitingForResourcesInfo represents the case the plugin is not ready to start +// Return in the case the plugin is not ready to start func PhaseInfoWaitingForResourcesInfo(t time.Time, version uint32, reason string, info *TaskInfo) PhaseInfo { pi := phaseInfo(PhaseWaitingForResources, version, nil, info, false) pi.reason = reason diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/config.go b/flyteplugins/go/tasks/plugins/k8s/ray/config.go index 10a8068344..7a79916ac7 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/config.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/config.go @@ -1,12 +1,8 @@ package ray import ( - "context" - pluginsConfig "github.com/flyteorg/flyte/flyteplugins/go/tasks/config" - "github.com/flyteorg/flyte/flyteplugins/go/tasks/logs" pluginmachinery "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s" - "github.com/flyteorg/flyte/flytestdlib/config" ) //go:generate pflags Config --default-var=defaultConfig @@ -18,39 +14,10 @@ var ( ServiceType: "NodePort", IncludeDashboard: true, DashboardHost: "0.0.0.0", - EnableUsageStats: false, - Defaults: DefaultConfig{ - HeadNode: NodeConfig{ - StartParameters: map[string]string{ - // Disable usage reporting by default: https://docs.ray.io/en/latest/cluster/usage-stats.html - DisableUsageStatsStartParameter: "true", - }, - IPAddress: "$MY_POD_IP", - }, - WorkerNode: NodeConfig{ - StartParameters: map[string]string{ - // Disable usage reporting by default: https://docs.ray.io/en/latest/cluster/usage-stats.html - DisableUsageStatsStartParameter: "true", - }, - IPAddress: "$MY_POD_IP", - }, - }, + NodeIPAddress: "$MY_POD_IP", } - configSection = pluginsConfig.MustRegisterSubSectionWithUpdates("ray", &defaultConfig, - func(ctx context.Context, newValue config.Config) { - if newValue == nil { - return - } - - if len(newValue.(*Config).Defaults.HeadNode.IPAddress) == 0 { - newValue.(*Config).Defaults.HeadNode.IPAddress = newValue.(*Config).DeprecatedNodeIPAddress - } - - if len(newValue.(*Config).Defaults.WorkerNode.IPAddress) == 0 { - newValue.(*Config).Defaults.WorkerNode.IPAddress = newValue.(*Config).DeprecatedNodeIPAddress - } - }) + configSection = pluginsConfig.MustRegisterSubSection("ray", &defaultConfig) ) // Config is config for 'ray' plugin @@ -72,24 +39,11 @@ type Config struct { // or 0.0.0.0 (available from all interfaces). By default, this is localhost. DashboardHost string `json:"dashboardHost,omitempty"` - // DeprecatedNodeIPAddress the IP address of the head node. By default, this is pod ip address. - DeprecatedNodeIPAddress string `json:"nodeIPAddress,omitempty" pflag:"-,DEPRECATED. Please use DefaultConfig.[HeadNode|WorkerNode].IPAddress"` + // NodeIPAddress the IP address of the head node. By default, this is pod ip address. + NodeIPAddress string `json:"nodeIPAddress,omitempty"` // Remote Ray Cluster Config RemoteClusterConfig pluginmachinery.ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for ray jobs"` - Logs logs.LogConfig `json:"logs" pflag:"-,Log configuration for ray jobs"` - Defaults DefaultConfig `json:"defaults" pflag:"-,Default configuration for ray jobs"` - EnableUsageStats bool `json:"enableUsageStats" pflag:",Enable usage stats for ray jobs. These stats are submitted to usage-stats.ray.io per https://docs.ray.io/en/latest/cluster/usage-stats.html"` -} - -type DefaultConfig struct { - HeadNode NodeConfig `json:"headNode,omitempty" pflag:"-,Default configuration for head node of ray jobs"` - WorkerNode NodeConfig `json:"workerNode,omitempty" pflag:"-,Default configuration for worker node of ray jobs"` -} - -type NodeConfig struct { - StartParameters map[string]string `json:"startParameters,omitempty" pflag:"-,Start parameters for the node"` - IPAddress string `json:"ipAddress,omitempty" pflag:"-,IP address of the node"` } func GetConfig() *Config { diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/config_flags.go b/flyteplugins/go/tasks/plugins/k8s/ray/config_flags.go index 8113a26278..f8e983056f 100755 --- a/flyteplugins/go/tasks/plugins/k8s/ray/config_flags.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/config_flags.go @@ -55,9 +55,9 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "serviceType"), defaultConfig.ServiceType, "") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "includeDashboard"), defaultConfig.IncludeDashboard, "") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "dashboardHost"), defaultConfig.DashboardHost, "") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "nodeIPAddress"), defaultConfig.NodeIPAddress, "") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "remoteClusterConfig.name"), defaultConfig.RemoteClusterConfig.Name, "Friendly name of the remote cluster") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "remoteClusterConfig.endpoint"), defaultConfig.RemoteClusterConfig.Endpoint, " Remote K8s cluster endpoint") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "remoteClusterConfig.enabled"), defaultConfig.RemoteClusterConfig.Enabled, " Boolean flag to enable or disable") - cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "enableUsageStats"), defaultConfig.EnableUsageStats, "Enable usage stats for ray jobs. These stats are submitted to usage-stats.ray.io per https://docs.ray.io/en/latest/cluster/usage-stats.html") return cmdFlags } diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/config_flags_test.go b/flyteplugins/go/tasks/plugins/k8s/ray/config_flags_test.go index f05c62c8e1..60761b9004 100755 --- a/flyteplugins/go/tasks/plugins/k8s/ray/config_flags_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/config_flags_test.go @@ -169,6 +169,20 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_nodeIPAddress", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("nodeIPAddress", testValue) + if vString, err := cmdFlags.GetString("nodeIPAddress"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.NodeIPAddress) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) t.Run("Test_remoteClusterConfig.name", func(t *testing.T) { t.Run("Override", func(t *testing.T) { @@ -211,18 +225,4 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_enableUsageStats", func(t *testing.T) { - - t.Run("Override", func(t *testing.T) { - testValue := "1" - - cmdFlags.Set("enableUsageStats", testValue) - if vBool, err := cmdFlags.GetBool("enableUsageStats"); err == nil { - testDecodeJson_Config(t, fmt.Sprintf("%v", vBool), &actual.EnableUsageStats) - - } else { - assert.FailNow(t, err.Error()) - } - }) - }) } diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go index 9b205beee6..69d811adc0 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go @@ -5,9 +5,10 @@ import ( "fmt" "strconv" "strings" + "time" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/plugins" - "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog" "github.com/flyteorg/flyte/flyteplugins/go/tasks/logs" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery" @@ -27,12 +28,11 @@ import ( ) const ( - rayTaskType = "ray" - KindRayJob = "RayJob" - IncludeDashboard = "include-dashboard" - NodeIPAddress = "node-ip-address" - DashboardHost = "dashboard-host" - DisableUsageStatsStartParameter = "disable-usage-stats" + rayTaskType = "ray" + KindRayJob = "RayJob" + IncludeDashboard = "include-dashboard" + NodeIPAddress = "node-ip-address" + DashboardHost = "dashboard-host" ) type rayJobResourceHandler struct { @@ -58,6 +58,7 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC } podSpec, objectMeta, primaryContainerName, err := flytek8s.ToK8sPodSpec(ctx, taskCtx) + if err != nil { return nil, flyteerr.Errorf(flyteerr.BadTaskSpecification, "Unable to create pod spec: [%v]", err.Error()) } @@ -76,36 +77,26 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC return nil, flyteerr.Errorf(flyteerr.BadTaskSpecification, "Unable to get primary container from the pod: [%v]", err.Error()) } - cfg := GetConfig() headReplicas := int32(1) headNodeRayStartParams := make(map[string]string) if rayJob.RayCluster.HeadGroupSpec != nil && rayJob.RayCluster.HeadGroupSpec.RayStartParams != nil { headNodeRayStartParams = rayJob.RayCluster.HeadGroupSpec.RayStartParams - } else if headNode := cfg.Defaults.HeadNode; len(headNode.StartParameters) > 0 { - headNodeRayStartParams = headNode.StartParameters } - if _, exist := headNodeRayStartParams[IncludeDashboard]; !exist { headNodeRayStartParams[IncludeDashboard] = strconv.FormatBool(GetConfig().IncludeDashboard) } - if _, exist := headNodeRayStartParams[NodeIPAddress]; !exist { - headNodeRayStartParams[NodeIPAddress] = cfg.Defaults.HeadNode.IPAddress + headNodeRayStartParams[NodeIPAddress] = GetConfig().NodeIPAddress } - if _, exist := headNodeRayStartParams[DashboardHost]; !exist { - headNodeRayStartParams[DashboardHost] = cfg.DashboardHost - } - - if _, exists := headNodeRayStartParams[DisableUsageStatsStartParameter]; !exists && !cfg.EnableUsageStats { - headNodeRayStartParams[DisableUsageStatsStartParameter] = "true" + headNodeRayStartParams[DashboardHost] = GetConfig().DashboardHost } enableIngress := true rayClusterSpec := rayv1alpha1.RayClusterSpec{ HeadGroupSpec: rayv1alpha1.HeadGroupSpec{ Template: buildHeadPodTemplate(&container, podSpec, objectMeta, taskCtx), - ServiceType: v1.ServiceType(cfg.ServiceType), + ServiceType: v1.ServiceType(GetConfig().ServiceType), Replicas: &headReplicas, EnableIngress: &enableIngress, RayStartParams: headNodeRayStartParams, @@ -121,7 +112,6 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC if spec.MinReplicas != 0 { minReplicas = spec.MinReplicas } - if spec.MaxReplicas != 0 { maxReplicas = spec.MaxReplicas } @@ -129,16 +119,9 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC workerNodeRayStartParams := make(map[string]string) if spec.RayStartParams != nil { workerNodeRayStartParams = spec.RayStartParams - } else if workerNode := cfg.Defaults.WorkerNode; len(workerNode.StartParameters) > 0 { - workerNodeRayStartParams = workerNode.StartParameters } - if _, exist := workerNodeRayStartParams[NodeIPAddress]; !exist { - workerNodeRayStartParams[NodeIPAddress] = cfg.Defaults.WorkerNode.IPAddress - } - - if _, exists := workerNodeRayStartParams[DisableUsageStatsStartParameter]; !exists && !cfg.EnableUsageStats { - workerNodeRayStartParams[DisableUsageStatsStartParameter] = "true" + workerNodeRayStartParams[NodeIPAddress] = GetConfig().NodeIPAddress } workerNodeSpec := rayv1alpha1.WorkerGroupSpec{ @@ -163,8 +146,8 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC jobSpec := rayv1alpha1.RayJobSpec{ RayClusterSpec: rayClusterSpec, Entrypoint: strings.Join(container.Args, " "), - ShutdownAfterJobFinishes: cfg.ShutdownAfterJobFinishes, - TTLSecondsAfterFinished: &cfg.TTLSecondsAfterFinished, + ShutdownAfterJobFinishes: GetConfig().ShutdownAfterJobFinishes, + TTLSecondsAfterFinished: &GetConfig().TTLSecondsAfterFinished, RuntimeEnv: rayJob.RuntimeEnv, } @@ -365,10 +348,12 @@ func (rayJobResourceHandler) BuildIdentityResource(ctx context.Context, taskCtx }, nil } -func getEventInfoForRayJob(logConfig logs.LogConfig, pluginContext k8s.PluginContext, rayJob *rayv1alpha1.RayJob) (*pluginsCore.TaskInfo, error) { - logPlugin, err := logs.InitializeLogPlugins(&logConfig) +func getEventInfoForRayJob() (*pluginsCore.TaskInfo, error) { + taskLogs := make([]*core.TaskLog, 0, 3) + logPlugin, err := logs.InitializeLogPlugins(logs.GetLogConfig()) + if err != nil { - return nil, fmt.Errorf("failed to initialize log plugins. Error: %w", err) + return nil, err } if logPlugin == nil { @@ -378,31 +363,22 @@ func getEventInfoForRayJob(logConfig logs.LogConfig, pluginContext k8s.PluginCon // TODO: Retrieve the name of head pod from rayJob.status, and add it to task logs // RayJob CRD does not include the name of the worker or head pod for now - taskID := pluginContext.TaskExecutionMetadata().GetTaskExecutionID().GetID() - logOutput, err := logPlugin.GetTaskLogs(tasklog.Input{ - Namespace: rayJob.Namespace, - TaskExecutionIdentifier: &taskID, - }) - - if err != nil { - return nil, fmt.Errorf("failed to generate task logs. Error: %w", err) - } + // TODO: Add ray Dashboard URI to task logs return &pluginsCore.TaskInfo{ - Logs: logOutput.TaskLogs, + Logs: taskLogs, }, nil } -func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContext, resource client.Object) (pluginsCore.PhaseInfo, error) { +func (rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContext, resource client.Object) (pluginsCore.PhaseInfo, error) { rayJob := resource.(*rayv1alpha1.RayJob) - info, err := getEventInfoForRayJob(GetConfig().Logs, pluginContext, rayJob) + info, err := getEventInfoForRayJob() if err != nil { return pluginsCore.PhaseInfoUndefined, err } - switch rayJob.Status.JobStatus { case rayv1alpha1.JobStatusPending: - return pluginsCore.PhaseInfoInitializing(rayJob.Status.StartTime.Time, pluginsCore.DefaultPhaseVersion, "job is pending", info), nil + return pluginsCore.PhaseInfoNotReady(time.Now(), pluginsCore.DefaultPhaseVersion, "job is pending"), nil case rayv1alpha1.JobStatusFailed: reason := fmt.Sprintf("Failed to create Ray job: %s", rayJob.Name) return pluginsCore.PhaseInfoFailure(flyteerr.TaskFailedWithError, reason, info), nil @@ -411,8 +387,7 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont case rayv1alpha1.JobStatusRunning: return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info), nil } - - return pluginsCore.PhaseInfoQueued(rayJob.CreationTimestamp.Time, pluginsCore.DefaultPhaseVersion, "JobCreated"), nil + return pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "JobCreated"), nil } func init() { diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go index 40513e0ffa..50db645b8e 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go @@ -3,12 +3,6 @@ package ray import ( "context" "testing" - "time" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/flyteorg/flyte/flyteplugins/go/tasks/logs" - mocks2 "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s/mocks" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" @@ -175,9 +169,7 @@ func TestBuildResourceRay(t *testing.T) { assert.Equal(t, ray.Spec.RayClusterSpec.HeadGroupSpec.Replicas, &headReplica) assert.Equal(t, ray.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.ServiceAccountName, serviceAccount) assert.Equal(t, ray.Spec.RayClusterSpec.HeadGroupSpec.RayStartParams, - map[string]string{ - "dashboard-host": "0.0.0.0", "disable-usage-stats": "true", "include-dashboard": "true", - "node-ip-address": "$MY_POD_IP", "num-cpus": "1"}) + map[string]string{"dashboard-host": "0.0.0.0", "include-dashboard": "true", "node-ip-address": "$MY_POD_IP", "num-cpus": "1"}) assert.Equal(t, ray.Spec.RayClusterSpec.HeadGroupSpec.Template.Annotations, map[string]string{"annotation-1": "val1"}) assert.Equal(t, ray.Spec.RayClusterSpec.HeadGroupSpec.Template.Labels, map[string]string{"label-1": "val1"}) assert.Equal(t, ray.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Tolerations, toleration) @@ -188,7 +180,7 @@ func TestBuildResourceRay(t *testing.T) { assert.Equal(t, ray.Spec.RayClusterSpec.WorkerGroupSpecs[0].MaxReplicas, &workerReplica) assert.Equal(t, ray.Spec.RayClusterSpec.WorkerGroupSpecs[0].GroupName, workerGroupName) assert.Equal(t, ray.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.ServiceAccountName, serviceAccount) - assert.Equal(t, ray.Spec.RayClusterSpec.WorkerGroupSpecs[0].RayStartParams, map[string]string{"disable-usage-stats": "true", "node-ip-address": "$MY_POD_IP"}) + assert.Equal(t, ray.Spec.RayClusterSpec.WorkerGroupSpecs[0].RayStartParams, map[string]string{"node-ip-address": "$MY_POD_IP"}) assert.Equal(t, ray.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Annotations, map[string]string{"annotation-1": "val1"}) assert.Equal(t, ray.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Labels, map[string]string{"label-1": "val1"}) assert.Equal(t, ray.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Tolerations, toleration) @@ -385,66 +377,6 @@ func TestDefaultStartParameters(t *testing.T) { assert.Equal(t, ray.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Tolerations, toleration) } -func newPluginContext() k8s.PluginContext { - plg := &mocks2.PluginContext{} - - taskExecID := &mocks.TaskExecutionID{} - taskExecID.OnGetID().Return(core.TaskExecutionIdentifier{ - NodeExecutionId: &core.NodeExecutionIdentifier{ - ExecutionId: &core.WorkflowExecutionIdentifier{ - Name: "my_name", - Project: "my_project", - Domain: "my_domain", - }, - }, - }) - - tskCtx := &mocks.TaskExecutionMetadata{} - tskCtx.OnGetTaskExecutionID().Return(taskExecID) - plg.OnTaskExecutionMetadata().Return(tskCtx) - return plg -} - -func init() { - f := defaultConfig - f.Logs = logs.LogConfig{ - IsKubernetesEnabled: true, - } - - if err := SetConfig(&f); err != nil { - panic(err) - } -} - -func TestGetTaskPhase(t *testing.T) { - ctx := context.Background() - rayJobResourceHandler := rayJobResourceHandler{} - pluginCtx := newPluginContext() - - testCases := []struct { - rayJobPhase rayv1alpha1.JobStatus - expectedCorePhase pluginsCore.Phase - }{ - {"", pluginsCore.PhaseQueued}, - {rayv1alpha1.JobStatusPending, pluginsCore.PhaseInitializing}, - {rayv1alpha1.JobStatusRunning, pluginsCore.PhaseRunning}, - {rayv1alpha1.JobStatusSucceeded, pluginsCore.PhaseSuccess}, - {rayv1alpha1.JobStatusFailed, pluginsCore.PhasePermanentFailure}, - } - - for _, tc := range testCases { - t.Run("TestGetTaskPhase_"+string(tc.rayJobPhase), func(t *testing.T) { - rayObject := &rayv1alpha1.RayJob{} - rayObject.Status.JobStatus = tc.rayJobPhase - startTime := metav1.NewTime(time.Now()) - rayObject.Status.StartTime = &startTime - phaseInfo, err := rayJobResourceHandler.GetTaskPhase(ctx, pluginCtx, rayObject) - assert.Nil(t, err) - assert.Equal(t, tc.expectedCorePhase.String(), phaseInfo.Phase().String()) - }) - } -} - func TestGetPropertiesRay(t *testing.T) { rayJobResourceHandler := rayJobResourceHandler{} expected := k8s.PluginProperties{}