Skip to content

Commit

Permalink
ensure no finalizers set on retrying terminal wf update
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Dittamo <[email protected]>
  • Loading branch information
pvditt committed Jan 23, 2024
1 parent 2890f9b commit 3912fb9
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 2 deletions.
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (p *Propeller) streak(ctx context.Context, w *v1alpha1.FlyteWorkflow, wfClo
mutableW := w.DeepCopy()
// catch potential indefinite update loop
if mutatedWf.GetExecutionStatus().IsTerminated() {
SetFinalizerIfEmpty(mutableW, FinalizerKey)
ResetFinalizers(mutableW)
SetDefinitionVersionIfEmpty(mutableW, v1alpha1.LatestWorkflowDefinitionVersion)
SetCompletedLabel(mutableW, time.Now())
msg := fmt.Sprintf("Workflow size has breached threshold. Finalized with status: %v", mutatedWf.GetExecutionStatus().GetPhase())
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ func TestNewPropellerHandler_UpdateFailure(t *testing.T) {
s.OnGetMatch(mock.Anything, mock.Anything, mock.Anything).Return(wf, nil)
s.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(workflowstore.ErrWorkflowToLarge, "too large")).Once()
s.On("Update", mock.Anything, mock.MatchedBy(func(w *v1alpha1.FlyteWorkflow) bool {
return w.Status.Phase == v1alpha1.WorkflowPhaseFailed && HasFinalizer(w) && HasCompletedLabel(w)
return w.Status.Phase == v1alpha1.WorkflowPhaseFailed && !HasFinalizer(w) && HasCompletedLabel(w)
}), mock.Anything).Return(nil, nil).Once()
err := p.Handle(ctx, namespace, name)
assert.NoError(t, err)
Expand Down

0 comments on commit 3912fb9

Please sign in to comment.