diff --git a/flytepropeller/pkg/controller/handler.go b/flytepropeller/pkg/controller/handler.go index cc6b18e819..277daddea9 100644 --- a/flytepropeller/pkg/controller/handler.go +++ b/flytepropeller/pkg/controller/handler.go @@ -385,11 +385,23 @@ func (p *Propeller) streak(ctx context.Context, w *v1alpha1.FlyteWorkflow, wfClo // Workflow is too large, we will mark the workflow as failing and record it. This will automatically // propagate the failure in the next round. mutableW := w.DeepCopy() - mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow size has breached threshold, aborting", &core.ExecutionError{ - Kind: core.ExecutionError_SYSTEM, - Code: "WorkflowTooLarge", - Message: "Workflow execution state is too large for Flyte to handle.", - }) + // if workflow is already in a terminal state then cleanup is already handled + if mutatedWf.GetExecutionStatus().IsTerminated() { + SetFinalizerIfEmpty(mutableW, FinalizerKey) + SetDefinitionVersionIfEmpty(mutableW, v1alpha1.LatestWorkflowDefinitionVersion) + SetCompletedLabel(mutableW, time.Now()) + mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailed, "Workflow size has breached threshold, aborted", &core.ExecutionError{ + Kind: core.ExecutionError_SYSTEM, + Code: "WorkflowTooLarge", + Message: "Workflow execution state is too large for Flyte to handle.", + }) + } else { + mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow size has breached threshold, aborting", &core.ExecutionError{ + Kind: core.ExecutionError_SYSTEM, + Code: "WorkflowTooLarge", + Message: "Workflow execution state is too large for Flyte to handle.", + }) + } if _, e := p.wfStore.Update(ctx, mutableW, workflowstore.PriorityClassCritical); e != nil { logger.Errorf(ctx, "Failed recording a large workflow as failed, reason: %s. Retrying...", e) return nil, e diff --git a/flytepropeller/pkg/controller/handler_test.go b/flytepropeller/pkg/controller/handler_test.go index ce1ca63818..737fb519ea 100644 --- a/flytepropeller/pkg/controller/handler_test.go +++ b/flytepropeller/pkg/controller/handler_test.go @@ -815,8 +815,36 @@ func TestNewPropellerHandler_UpdateFailure(t *testing.T) { } s.OnGetMatch(mock.Anything, mock.Anything, mock.Anything).Return(wf, nil) s.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(workflowstore.ErrWorkflowToLarge, "too large")).Once() - s.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil).Once() + s.On("Update", mock.Anything, mock.MatchedBy(func(w *v1alpha1.FlyteWorkflow) bool { + return w.Status.Phase == v1alpha1.WorkflowPhaseFailing + }), mock.Anything).Return(nil, nil).Once() + err := p.Handle(ctx, namespace, name) + assert.NoError(t, err) + }) + t.Run("too-large-terminal", func(t *testing.T) { + scope := promutils.NewTestScope() + s := &mocks.FlyteWorkflow{} + exec := &mockExecutor{} + p := NewPropellerHandler(ctx, cfg, nil, s, exec, scope) + wf := &v1alpha1.FlyteWorkflow{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + ID: "w1", + }, + } + exec.HandleCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow) error { + w.GetExecutionStatus().UpdatePhase(v1alpha1.WorkflowPhaseFailed, "done", nil) + return nil + } + s.OnGetMatch(mock.Anything, mock.Anything, mock.Anything).Return(wf, nil) + s.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(workflowstore.ErrWorkflowToLarge, "too large")).Once() + s.On("Update", mock.Anything, mock.MatchedBy(func(w *v1alpha1.FlyteWorkflow) bool { + return w.Status.Phase == v1alpha1.WorkflowPhaseFailed && HasFinalizer(w) && HasCompletedLabel(w) + }), mock.Anything).Return(nil, nil).Once() err := p.Handle(ctx, namespace, name) assert.NoError(t, err) })