From e794063072edb81f9939562cb1f00aeca664d446 Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Tue, 7 Nov 2023 09:09:24 -0600 Subject: [PATCH] completing retries even if minSuccesses are achieved (#4338) * completing retries even if minSuccesses are achieved Signed-off-by: Daniel Rammer * ensuring all tasks in a terminal state Signed-off-by: Daniel Rammer * updated dead code to comment Signed-off-by: Daniel Rammer --------- Signed-off-by: Daniel Rammer --- .../go/tasks/plugins/array/core/state.go | 5 ++++- .../go/tasks/plugins/array/core/state_test.go | 18 ++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/flyteplugins/go/tasks/plugins/array/core/state.go b/flyteplugins/go/tasks/plugins/array/core/state.go index 665eec309f..f601ec375b 100644 --- a/flyteplugins/go/tasks/plugins/array/core/state.go +++ b/flyteplugins/go/tasks/plugins/array/core/state.go @@ -273,7 +273,10 @@ func SummaryToPhase(ctx context.Context, minSuccesses int64, summary arraystatus logger.Infof(ctx, "Array is still running and waiting for resources totalWaitingForResources[%v]", totalWaitingForResources) return PhaseWaitingForResources } - if totalSuccesses >= minSuccesses && totalRunning == 0 { + + // if we have enough successes, ensure all tasks are in a terminal phase (success or failure) + // before transitioning to the next phase. + if totalSuccesses >= minSuccesses && totalSuccesses+totalPermanentFailures == totalCount { logger.Infof(ctx, "Array succeeded because totalSuccesses[%v] >= minSuccesses[%v]", totalSuccesses, minSuccesses) return PhaseWriteToDiscovery } diff --git a/flyteplugins/go/tasks/plugins/array/core/state_test.go b/flyteplugins/go/tasks/plugins/array/core/state_test.go index 26a80531b5..01b5b41528 100644 --- a/flyteplugins/go/tasks/plugins/array/core/state_test.go +++ b/flyteplugins/go/tasks/plugins/array/core/state_test.go @@ -349,6 +349,24 @@ func TestSummaryToPhase(t *testing.T) { core.PhaseRetryableFailure: 5, }, }, + { + // complete retry even though minSuccesses is achieved + "RetryMinSuccessRatio", + PhaseCheckingSubTaskExecutions, + map[core.Phase]int64{ + core.PhaseSuccess: 10, + core.PhaseRetryableFailure: 1, + }, + }, + { + // ensure all tasks are executed even if minSuccesses is achieved + "ExecuteAllMinSuccessRatio", + PhaseCheckingSubTaskExecutions, + map[core.Phase]int64{ + core.PhaseSuccess: 10, + core.PhaseUndefined: 1, + }, + }, } for _, tt := range tests {