Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure that nodes complete when workflow fails with parallelism and failFast. Fixes #13806 #13827

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions test/e2e/expectedfailures/parallelism-dag-fail-fast.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: parallelism-dag-fail-fast
spec:
entrypoint: main
templates:
- name: main
parallelism: 2
failFast: true
dag:
tasks:
- name: task1
template: fail
- name: task2
template: sleep
- name: fail
container:
image: alpine:latest
command: [ sh, -c ]
args: ["exit 1"]
- name: sleep
container:
image: alpine:latest
command: [sh, -c]
args: ["sleep 5"]
38 changes: 0 additions & 38 deletions test/e2e/expectedfailures/parallelism-dag-failure.yaml

This file was deleted.

25 changes: 25 additions & 0 deletions test/e2e/expectedfailures/parallelism-step-fail-fast.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: parallelism-step-fail-fast
spec:
entrypoint: main
templates:
- name: main
parallelism: 2
failFast: true
steps:
- - name: step1
template: fail
- name: step2
template: sleep
- name: fail
container:
image: alpine:latest
command: [sh, -c]
args: ["exit 1"]
- name: sleep
container:
image: alpine:latest
command: [ sh, -c ]
args: [ "sleep 5" ]
28 changes: 0 additions & 28 deletions test/e2e/expectedfailures/parallelism-step-failure.yaml

This file was deleted.

31 changes: 31 additions & 0 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1415,3 +1415,34 @@ func (s *FunctionalSuite) TestWorkflowExitHandlerCrashEnsureNodeIsPresent() {
assert.NotNil(t, hookNode.Inputs.Parameters[0].Value)
})
}

func (s *FunctionalSuite) TestWorkflowParallelismStepFailFast() {
s.Given().
Workflow("@expectedfailures/parallelism-step-fail-fast.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeRunning).
WaitForWorkflow(fixtures.ToBeFailed).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, "template has failed or errored children and failFast enabled", status.Message)
assert.Equal(t, wfv1.NodeFailed, status.Nodes.FindByDisplayName("[0]").Phase)
assert.Equal(t, wfv1.NodeFailed, status.Nodes.FindByDisplayName("step1").Phase)
assert.Equal(t, wfv1.NodeSucceeded, status.Nodes.FindByDisplayName("step2").Phase)
})
}

func (s *FunctionalSuite) TestWorkflowParallelismDAGFailFast() {
s.Given().
Workflow("@expectedfailures/parallelism-dag-fail-fast.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeRunning).
WaitForWorkflow(fixtures.ToBeFailed).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, "template has failed or errored children and failFast enabled", status.Message)
assert.Equal(t, wfv1.NodeFailed, status.Nodes.FindByDisplayName("task1").Phase)
assert.Equal(t, wfv1.NodeSucceeded, status.Nodes.FindByDisplayName("task2").Phase)
})
}
40 changes: 37 additions & 3 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2770,6 +2770,26 @@ func (woc *wfOperationCtx) markNodeWaitingForLock(nodeName string, lockName stri
return node, nil
}

func (woc *wfOperationCtx) findLeafNodeWithType(boundaryID string, nodeType wfv1.NodeType) *wfv1.NodeStatus {
var leafNode *wfv1.NodeStatus
var dfs func(nodeID string)
dfs = func(nodeID string) {
node, err := woc.wf.Status.Nodes.Get(nodeID)
if err != nil {
jswxstw marked this conversation as resolved.
Show resolved Hide resolved
woc.log.Errorf("was unable to obtain node for %s", nodeID)
return
}
if node.Type == nodeType {
leafNode = node
}
for _, childID := range node.Children {
dfs(childID)
}
}
dfs(boundaryID)
jswxstw marked this conversation as resolved.
Show resolved Hide resolved
return leafNode
}

// checkParallelism checks if the given template is able to be executed, considering the current active pods and workflow/template parallelism
func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.NodeStatus, boundaryID string) error {
if woc.execWf.Spec.Parallelism != nil && woc.activePods >= *woc.execWf.Spec.Parallelism {
Expand All @@ -2781,7 +2801,14 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node
if node != nil && (tmpl.GetType() == wfv1.TemplateTypeDAG || tmpl.GetType() == wfv1.TemplateTypeSteps) {
// Check failFast
if tmpl.IsFailFast() && woc.getUnsuccessfulChildren(node.ID) > 0 {
woc.markNodePhase(node.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled")
if woc.getActivePods(node.ID) == 0 {
if tmpl.GetType() == wfv1.TemplateTypeSteps {
if leafStepGroupNode := woc.findLeafNodeWithType(node.ID, wfv1.NodeTypeStepGroup); leafStepGroupNode != nil {
woc.markNodePhase(leafStepGroupNode.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled")
}
}
woc.markNodePhase(node.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled")
}
return ErrParallelismReached
}

Expand All @@ -2792,7 +2819,7 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node
}
}

// if we are about to execute a pod, make sure our parent hasn't reached it's limit
// if we are about to execute a pod, make sure our parent hasn't reached its limit
if boundaryID != "" && (node == nil || (node.Phase != wfv1.NodePending && node.Phase != wfv1.NodeRunning)) {
boundaryNode, err := woc.wf.Status.Nodes.Get(boundaryID)
if err != nil {
Expand All @@ -2810,7 +2837,14 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node

// Check failFast
if boundaryTemplate.IsFailFast() && woc.getUnsuccessfulChildren(boundaryID) > 0 {
woc.markNodePhase(boundaryNode.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled")
if woc.getActivePods(boundaryID) == 0 {
if boundaryTemplate.GetType() == wfv1.TemplateTypeSteps {
if leafStepGroupNode := woc.findLeafNodeWithType(boundaryID, wfv1.NodeTypeStepGroup); leafStepGroupNode != nil {
woc.markNodePhase(leafStepGroupNode.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled")
}
}
woc.markNodePhase(boundaryNode.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled")
}
return ErrParallelismReached
}

Expand Down
Loading