From 526839159643f9f65605570500d0d4254a66b101 Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Wed, 8 Nov 2023 21:41:45 -0800 Subject: [PATCH 1/2] Handle all ray job statuses Signed-off-by: Haytham Abuelfutuh --- flyteplugins/go/tasks/plugins/k8s/ray/ray.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go index cc8d198334..ef299866f6 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go @@ -470,7 +470,7 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont return pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "Scheduling"), nil } - // Kuberay creates a Ray cluster first, and then submits a Ray job to the cluster + // KubeRay creates a Ray cluster first, and then submits a Ray job to the cluster switch rayJob.Status.JobDeploymentStatus { case rayv1alpha1.JobDeploymentStatusInitializing: return pluginsCore.PhaseInfoInitializing(rayJob.CreationTimestamp.Time, pluginsCore.DefaultPhaseVersion, "cluster is creating", info), nil @@ -480,7 +480,7 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont case rayv1alpha1.JobDeploymentStatusFailedJobDeploy: reason := fmt.Sprintf("Failed to submit Ray job %s with error: %s", rayJob.Name, rayJob.Status.Message) return pluginsCore.PhaseInfoFailure(flyteerr.TaskFailedWithError, reason, info), nil - case rayv1alpha1.JobDeploymentStatusWaitForDashboard: + case rayv1alpha1.JobDeploymentStatusWaitForDashboard, rayv1alpha1.JobDeploymentStatusFailedToGetJobStatus: return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info), nil case rayv1alpha1.JobDeploymentStatusRunning, rayv1alpha1.JobDeploymentStatusComplete: switch rayJob.Status.JobStatus { @@ -489,12 +489,18 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont return pluginsCore.PhaseInfoFailure(flyteerr.TaskFailedWithError, reason, info), nil case rayv1alpha1.JobStatusSucceeded: return pluginsCore.PhaseInfoSuccess(info), nil - case rayv1alpha1.JobStatusPending, rayv1alpha1.JobStatusRunning: + case rayv1alpha1.JobStatusPending, rayv1alpha1.JobStatusRunning, rayv1alpha1.JobStatusStopped: return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info), nil + default: + // We already handle all known job status, so this should never happen unless a future version of ray + // introduced a new job status. + return pluginsCore.PhaseInfoUndefined, fmt.Errorf("unknown job status: %s", rayJob.Status.JobStatus) } + default: + // We already handle all known deployment status, so this should never happen unless a future version of ray + // introduced a new job status. + return pluginsCore.PhaseInfoUndefined, fmt.Errorf("unknown job deployment status: %s", rayJob.Status.JobDeploymentStatus) } - - return pluginsCore.PhaseInfoUndefined, nil } func init() { From 8a3ca32d0cce21c9acda41126af219d0674b21db Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Fri, 10 Nov 2023 09:55:11 -0600 Subject: [PATCH 2/2] PR Comments Signed-off-by: Haytham Abuelfutuh --- flyteplugins/go/tasks/plugins/k8s/ray/ray.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go index ef299866f6..1d0fde4ca8 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go @@ -489,8 +489,11 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont return pluginsCore.PhaseInfoFailure(flyteerr.TaskFailedWithError, reason, info), nil case rayv1alpha1.JobStatusSucceeded: return pluginsCore.PhaseInfoSuccess(info), nil - case rayv1alpha1.JobStatusPending, rayv1alpha1.JobStatusRunning, rayv1alpha1.JobStatusStopped: + case rayv1alpha1.JobStatusPending, rayv1alpha1.JobStatusRunning: return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info), nil + case rayv1alpha1.JobStatusStopped: + // There is no current usage of this job status in KubeRay. It's unclear what it represents + fallthrough default: // We already handle all known job status, so this should never happen unless a future version of ray // introduced a new job status.