From 4859eb1b449579561cb575e37db4b3c4e0af9f43 Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Sun, 21 Jan 2024 16:05:09 -0800 Subject: [PATCH 1/3] keep terminal phase on retry if already in terminal phase and updating CRD fails due to ErrWorkflowToLarge Signed-off-by: Paul Dittamo --- flytepropeller/pkg/controller/handler.go | 22 ++++++++++---- flytepropeller/pkg/controller/handler_test.go | 30 ++++++++++++++++++- 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/flytepropeller/pkg/controller/handler.go b/flytepropeller/pkg/controller/handler.go index cc6b18e819..277daddea9 100644 --- a/flytepropeller/pkg/controller/handler.go +++ b/flytepropeller/pkg/controller/handler.go @@ -385,11 +385,23 @@ func (p *Propeller) streak(ctx context.Context, w *v1alpha1.FlyteWorkflow, wfClo // Workflow is too large, we will mark the workflow as failing and record it. This will automatically // propagate the failure in the next round. mutableW := w.DeepCopy() - mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow size has breached threshold, aborting", &core.ExecutionError{ - Kind: core.ExecutionError_SYSTEM, - Code: "WorkflowTooLarge", - Message: "Workflow execution state is too large for Flyte to handle.", - }) + // if workflow is already in a terminal state then cleanup is already handled + if mutatedWf.GetExecutionStatus().IsTerminated() { + SetFinalizerIfEmpty(mutableW, FinalizerKey) + SetDefinitionVersionIfEmpty(mutableW, v1alpha1.LatestWorkflowDefinitionVersion) + SetCompletedLabel(mutableW, time.Now()) + mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailed, "Workflow size has breached threshold, aborted", &core.ExecutionError{ + Kind: core.ExecutionError_SYSTEM, + Code: "WorkflowTooLarge", + Message: "Workflow execution state is too large for Flyte to handle.", + }) + } else { + mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow size has breached threshold, aborting", &core.ExecutionError{ + Kind: core.ExecutionError_SYSTEM, + Code: "WorkflowTooLarge", + Message: "Workflow execution state is too large for Flyte to handle.", + }) + } if _, e := p.wfStore.Update(ctx, mutableW, workflowstore.PriorityClassCritical); e != nil { logger.Errorf(ctx, "Failed recording a large workflow as failed, reason: %s. Retrying...", e) return nil, e diff --git a/flytepropeller/pkg/controller/handler_test.go b/flytepropeller/pkg/controller/handler_test.go index ce1ca63818..737fb519ea 100644 --- a/flytepropeller/pkg/controller/handler_test.go +++ b/flytepropeller/pkg/controller/handler_test.go @@ -815,8 +815,36 @@ func TestNewPropellerHandler_UpdateFailure(t *testing.T) { } s.OnGetMatch(mock.Anything, mock.Anything, mock.Anything).Return(wf, nil) s.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(workflowstore.ErrWorkflowToLarge, "too large")).Once() - s.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil).Once() + s.On("Update", mock.Anything, mock.MatchedBy(func(w *v1alpha1.FlyteWorkflow) bool { + return w.Status.Phase == v1alpha1.WorkflowPhaseFailing + }), mock.Anything).Return(nil, nil).Once() + err := p.Handle(ctx, namespace, name) + assert.NoError(t, err) + }) + t.Run("too-large-terminal", func(t *testing.T) { + scope := promutils.NewTestScope() + s := &mocks.FlyteWorkflow{} + exec := &mockExecutor{} + p := NewPropellerHandler(ctx, cfg, nil, s, exec, scope) + wf := &v1alpha1.FlyteWorkflow{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + ID: "w1", + }, + } + exec.HandleCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow) error { + w.GetExecutionStatus().UpdatePhase(v1alpha1.WorkflowPhaseFailed, "done", nil) + return nil + } + s.OnGetMatch(mock.Anything, mock.Anything, mock.Anything).Return(wf, nil) + s.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(workflowstore.ErrWorkflowToLarge, "too large")).Once() + s.On("Update", mock.Anything, mock.MatchedBy(func(w *v1alpha1.FlyteWorkflow) bool { + return w.Status.Phase == v1alpha1.WorkflowPhaseFailed && HasFinalizer(w) && HasCompletedLabel(w) + }), mock.Anything).Return(nil, nil).Once() err := p.Handle(ctx, namespace, name) assert.NoError(t, err) }) From 2890f9b21dad6750b94911a307c0abd1125dadf7 Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Mon, 22 Jan 2024 09:36:18 -0800 Subject: [PATCH 2/3] update error message Signed-off-by: Paul Dittamo --- flytepropeller/pkg/controller/handler.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flytepropeller/pkg/controller/handler.go b/flytepropeller/pkg/controller/handler.go index 277daddea9..94a97e432d 100644 --- a/flytepropeller/pkg/controller/handler.go +++ b/flytepropeller/pkg/controller/handler.go @@ -385,12 +385,13 @@ func (p *Propeller) streak(ctx context.Context, w *v1alpha1.FlyteWorkflow, wfClo // Workflow is too large, we will mark the workflow as failing and record it. This will automatically // propagate the failure in the next round. mutableW := w.DeepCopy() - // if workflow is already in a terminal state then cleanup is already handled + // catch potential indefinite update loop if mutatedWf.GetExecutionStatus().IsTerminated() { SetFinalizerIfEmpty(mutableW, FinalizerKey) SetDefinitionVersionIfEmpty(mutableW, v1alpha1.LatestWorkflowDefinitionVersion) SetCompletedLabel(mutableW, time.Now()) - mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailed, "Workflow size has breached threshold, aborted", &core.ExecutionError{ + msg := fmt.Sprintf("Workflow size has breached threshold. Finalized with status: %v", mutatedWf.GetExecutionStatus().GetPhase()) + mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailed, msg, &core.ExecutionError{ Kind: core.ExecutionError_SYSTEM, Code: "WorkflowTooLarge", Message: "Workflow execution state is too large for Flyte to handle.", From 3912fb9d4dc3e758534260a4ba43977366ae0697 Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Tue, 23 Jan 2024 10:45:12 -0800 Subject: [PATCH 3/3] ensure no finalizers set on retrying terminal wf update Signed-off-by: Paul Dittamo --- flytepropeller/pkg/controller/handler.go | 2 +- flytepropeller/pkg/controller/handler_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flytepropeller/pkg/controller/handler.go b/flytepropeller/pkg/controller/handler.go index 94a97e432d..94e8ab6c12 100644 --- a/flytepropeller/pkg/controller/handler.go +++ b/flytepropeller/pkg/controller/handler.go @@ -387,7 +387,7 @@ func (p *Propeller) streak(ctx context.Context, w *v1alpha1.FlyteWorkflow, wfClo mutableW := w.DeepCopy() // catch potential indefinite update loop if mutatedWf.GetExecutionStatus().IsTerminated() { - SetFinalizerIfEmpty(mutableW, FinalizerKey) + ResetFinalizers(mutableW) SetDefinitionVersionIfEmpty(mutableW, v1alpha1.LatestWorkflowDefinitionVersion) SetCompletedLabel(mutableW, time.Now()) msg := fmt.Sprintf("Workflow size has breached threshold. Finalized with status: %v", mutatedWf.GetExecutionStatus().GetPhase()) diff --git a/flytepropeller/pkg/controller/handler_test.go b/flytepropeller/pkg/controller/handler_test.go index 737fb519ea..3469c1da80 100644 --- a/flytepropeller/pkg/controller/handler_test.go +++ b/flytepropeller/pkg/controller/handler_test.go @@ -843,7 +843,7 @@ func TestNewPropellerHandler_UpdateFailure(t *testing.T) { s.OnGetMatch(mock.Anything, mock.Anything, mock.Anything).Return(wf, nil) s.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(workflowstore.ErrWorkflowToLarge, "too large")).Once() s.On("Update", mock.Anything, mock.MatchedBy(func(w *v1alpha1.FlyteWorkflow) bool { - return w.Status.Phase == v1alpha1.WorkflowPhaseFailed && HasFinalizer(w) && HasCompletedLabel(w) + return w.Status.Phase == v1alpha1.WorkflowPhaseFailed && !HasFinalizer(w) && HasCompletedLabel(w) }), mock.Anything).Return(nil, nil).Once() err := p.Handle(ctx, namespace, name) assert.NoError(t, err)