From 7712626d9cc638728bf471c61148fb8e22c63e44 Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Fri, 10 Nov 2023 09:07:23 -0700 Subject: [PATCH] Handle all ray job statuses (#4389) Signed-off-by: Haytham Abuelfutuh --- flyteplugins/go/tasks/plugins/k8s/ray/ray.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go index cc8d198334..1d0fde4ca8 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go @@ -470,7 +470,7 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont return pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "Scheduling"), nil } - // Kuberay creates a Ray cluster first, and then submits a Ray job to the cluster + // KubeRay creates a Ray cluster first, and then submits a Ray job to the cluster switch rayJob.Status.JobDeploymentStatus { case rayv1alpha1.JobDeploymentStatusInitializing: return pluginsCore.PhaseInfoInitializing(rayJob.CreationTimestamp.Time, pluginsCore.DefaultPhaseVersion, "cluster is creating", info), nil @@ -480,7 +480,7 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont case rayv1alpha1.JobDeploymentStatusFailedJobDeploy: reason := fmt.Sprintf("Failed to submit Ray job %s with error: %s", rayJob.Name, rayJob.Status.Message) return pluginsCore.PhaseInfoFailure(flyteerr.TaskFailedWithError, reason, info), nil - case rayv1alpha1.JobDeploymentStatusWaitForDashboard: + case rayv1alpha1.JobDeploymentStatusWaitForDashboard, rayv1alpha1.JobDeploymentStatusFailedToGetJobStatus: return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info), nil case rayv1alpha1.JobDeploymentStatusRunning, rayv1alpha1.JobDeploymentStatusComplete: switch rayJob.Status.JobStatus { @@ -491,10 +491,19 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont return pluginsCore.PhaseInfoSuccess(info), nil case rayv1alpha1.JobStatusPending, rayv1alpha1.JobStatusRunning: return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info), nil + case rayv1alpha1.JobStatusStopped: + // There is no current usage of this job status in KubeRay. It's unclear what it represents + fallthrough + default: + // We already handle all known job status, so this should never happen unless a future version of ray + // introduced a new job status. + return pluginsCore.PhaseInfoUndefined, fmt.Errorf("unknown job status: %s", rayJob.Status.JobStatus) } + default: + // We already handle all known deployment status, so this should never happen unless a future version of ray + // introduced a new job status. + return pluginsCore.PhaseInfoUndefined, fmt.Errorf("unknown job deployment status: %s", rayJob.Status.JobDeploymentStatus) } - - return pluginsCore.PhaseInfoUndefined, nil } func init() {