From c58d58a638e23591895602fbd741f5c85072745e Mon Sep 17 00:00:00 2001 From: jswxstw Date: Thu, 31 Oct 2024 16:16:52 +0800 Subject: [PATCH] fix: ensure that nodes complete when workflow fails with parallelism and failFast enabled. Fixes #13806 Signed-off-by: oninowang --- .../parallelism-dag-fail-fast.yaml | 26 ++++++++++++ .../parallelism-dag-failure.yaml | 38 ------------------ .../parallelism-step-fail-fast.yaml | 25 ++++++++++++ .../parallelism-step-failure.yaml | 28 ------------- test/e2e/functional_test.go | 31 ++++++++++++++ workflow/controller/operator.go | 40 +++++++++++++++++-- 6 files changed, 119 insertions(+), 69 deletions(-) create mode 100644 test/e2e/expectedfailures/parallelism-dag-fail-fast.yaml delete mode 100644 test/e2e/expectedfailures/parallelism-dag-failure.yaml create mode 100644 test/e2e/expectedfailures/parallelism-step-fail-fast.yaml delete mode 100644 test/e2e/expectedfailures/parallelism-step-failure.yaml diff --git a/test/e2e/expectedfailures/parallelism-dag-fail-fast.yaml b/test/e2e/expectedfailures/parallelism-dag-fail-fast.yaml new file mode 100644 index 000000000000..fa52fc621913 --- /dev/null +++ b/test/e2e/expectedfailures/parallelism-dag-fail-fast.yaml @@ -0,0 +1,26 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: parallelism-dag-fail-fast +spec: + entrypoint: main + templates: + - name: main + parallelism: 2 + failFast: true + dag: + tasks: + - name: task1 + template: fail + - name: task2 + template: sleep + - name: fail + container: + image: alpine:latest + command: [ sh, -c ] + args: ["exit 1"] + - name: sleep + container: + image: alpine:latest + command: [sh, -c] + args: ["sleep 5"] \ No newline at end of file diff --git a/test/e2e/expectedfailures/parallelism-dag-failure.yaml b/test/e2e/expectedfailures/parallelism-dag-failure.yaml deleted file mode 100644 index 478d3f1724c0..000000000000 --- a/test/e2e/expectedfailures/parallelism-dag-failure.yaml +++ /dev/null @@ -1,38 +0,0 @@ -apiVersion: argoproj.io/v1alpha1 -kind: Workflow -metadata: - generateName: parallelism-failed-dag- -spec: - entrypoint: parallelism-failed-dag - templates: - - name: parallelism-failed-dag - parallelism: 2 - dag: - tasks: - - name: A - template: pass - - name: B - dependencies: [A] - template: pass - - name: C - dependencies: [A] - template: pass - - name: D - dependencies: [A] - template: fail - - name: E - dependencies: [A] - template: pass - - name: F - dependencies: [B, C, D, E] - template: pass - - - name: pass - container: - image: alpine:3.7 - command: [sh, -c, exit 0] - - - name: fail - container: - image: alpine:3.7 - command: [sh, -c, exit 1] diff --git a/test/e2e/expectedfailures/parallelism-step-fail-fast.yaml b/test/e2e/expectedfailures/parallelism-step-fail-fast.yaml new file mode 100644 index 000000000000..04c7798eb20f --- /dev/null +++ b/test/e2e/expectedfailures/parallelism-step-fail-fast.yaml @@ -0,0 +1,25 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: parallelism-step-fail-fast +spec: + entrypoint: main + templates: + - name: main + parallelism: 2 + failFast: true + steps: + - - name: step1 + template: fail + - name: step2 + template: sleep + - name: fail + container: + image: alpine:latest + command: [sh, -c] + args: ["exit 1"] + - name: sleep + container: + image: alpine:latest + command: [ sh, -c ] + args: [ "sleep 5" ] \ No newline at end of file diff --git a/test/e2e/expectedfailures/parallelism-step-failure.yaml b/test/e2e/expectedfailures/parallelism-step-failure.yaml deleted file mode 100644 index 37682259d852..000000000000 --- a/test/e2e/expectedfailures/parallelism-step-failure.yaml +++ /dev/null @@ -1,28 +0,0 @@ -apiVersion: argoproj.io/v1alpha1 -kind: Workflow -metadata: - generateName: parallelism-failed-step- -spec: - entrypoint: parallelism-failed-step - templates: - - name: parallelism-failed-step - parallelism: 2 - steps: - - - name: sleep - template: sleep - arguments: - parameters: - - name: exit-code - value: "{{item}}" - withItems: - - 0 - - 1 - - 0 - - - name: sleep - inputs: - parameters: - - name: exit-code - container: - image: alpine:latest - command: [sh, -c, "exit {{inputs.parameters.exit-code}}"] diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index 906eeac31770..44c51a3227c1 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -1415,3 +1415,34 @@ func (s *FunctionalSuite) TestWorkflowExitHandlerCrashEnsureNodeIsPresent() { assert.NotNil(t, hookNode.Inputs.Parameters[0].Value) }) } + +func (s *FunctionalSuite) TestWorkflowParallelismStepFailFast() { + s.Given(). + Workflow("@expectedfailures/parallelism-step-fail-fast.yaml"). + When(). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToBeRunning). + WaitForWorkflow(fixtures.ToBeFailed). + Then(). + ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *wfv1.WorkflowStatus) { + assert.Equal(t, "template has failed or errored children and failFast enabled", status.Message) + assert.Equal(t, wfv1.NodeFailed, status.Nodes.FindByDisplayName("[0]").Phase) + assert.Equal(t, wfv1.NodeFailed, status.Nodes.FindByDisplayName("step1").Phase) + assert.Equal(t, wfv1.NodeSucceeded, status.Nodes.FindByDisplayName("step2").Phase) + }) +} + +func (s *FunctionalSuite) TestWorkflowParallelismDAGFailFast() { + s.Given(). + Workflow("@expectedfailures/parallelism-dag-fail-fast.yaml"). + When(). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToBeRunning). + WaitForWorkflow(fixtures.ToBeFailed). + Then(). + ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *wfv1.WorkflowStatus) { + assert.Equal(t, "template has failed or errored children and failFast enabled", status.Message) + assert.Equal(t, wfv1.NodeFailed, status.Nodes.FindByDisplayName("task1").Phase) + assert.Equal(t, wfv1.NodeSucceeded, status.Nodes.FindByDisplayName("task2").Phase) + }) +} diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 16ddf218a2c8..d471e35576f0 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -2770,6 +2770,26 @@ func (woc *wfOperationCtx) markNodeWaitingForLock(nodeName string, lockName stri return node, nil } +func (woc *wfOperationCtx) findLeafNodeWithType(boundaryID string, nodeType wfv1.NodeType) *wfv1.NodeStatus { + var leafNode *wfv1.NodeStatus + var dfs func(nodeID string) + dfs = func(nodeID string) { + node, err := woc.wf.Status.Nodes.Get(nodeID) + if err != nil { + woc.log.Errorf("was unable to obtain node for %s", nodeID) + return + } + if node.Type == nodeType { + leafNode = node + } + for _, childID := range node.Children { + dfs(childID) + } + } + dfs(boundaryID) + return leafNode +} + // checkParallelism checks if the given template is able to be executed, considering the current active pods and workflow/template parallelism func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.NodeStatus, boundaryID string) error { if woc.execWf.Spec.Parallelism != nil && woc.activePods >= *woc.execWf.Spec.Parallelism { @@ -2781,7 +2801,14 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node if node != nil && (tmpl.GetType() == wfv1.TemplateTypeDAG || tmpl.GetType() == wfv1.TemplateTypeSteps) { // Check failFast if tmpl.IsFailFast() && woc.getUnsuccessfulChildren(node.ID) > 0 { - woc.markNodePhase(node.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled") + if woc.getActivePods(node.ID) == 0 { + if tmpl.GetType() == wfv1.TemplateTypeSteps { + if leafStepGroupNode := woc.findLeafNodeWithType(node.ID, wfv1.NodeTypeStepGroup); leafStepGroupNode != nil { + woc.markNodePhase(leafStepGroupNode.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled") + } + } + woc.markNodePhase(node.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled") + } return ErrParallelismReached } @@ -2792,7 +2819,7 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node } } - // if we are about to execute a pod, make sure our parent hasn't reached it's limit + // if we are about to execute a pod, make sure our parent hasn't reached its limit if boundaryID != "" && (node == nil || (node.Phase != wfv1.NodePending && node.Phase != wfv1.NodeRunning)) { boundaryNode, err := woc.wf.Status.Nodes.Get(boundaryID) if err != nil { @@ -2810,7 +2837,14 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node // Check failFast if boundaryTemplate.IsFailFast() && woc.getUnsuccessfulChildren(boundaryID) > 0 { - woc.markNodePhase(boundaryNode.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled") + if woc.getActivePods(boundaryID) == 0 { + if boundaryTemplate.GetType() == wfv1.TemplateTypeSteps { + if leafStepGroupNode := woc.findLeafNodeWithType(boundaryID, wfv1.NodeTypeStepGroup); leafStepGroupNode != nil { + woc.markNodePhase(leafStepGroupNode.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled") + } + } + woc.markNodePhase(boundaryNode.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled") + } return ErrParallelismReached }