Skip to content

Commit

Permalink
fix: workflow stuck in running state when using activeDeadlineSeconds…
Browse files Browse the repository at this point in the history
… on template level. Fixes: argoproj#12329 (argoproj#12761)

Signed-off-by: shuangkun <[email protected]>
Signed-off-by: shuangkun tian <[email protected]>
Co-authored-by: Julie Vogelman <[email protected]>
  • Loading branch information
shuangkun and juliev0 authored Mar 15, 2024
1 parent 5b3909b commit 16cfef9
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 5 deletions.
103 changes: 103 additions & 0 deletions test/e2e/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
package e2e

import (
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/test/e2e/fixtures"
"github.com/argoproj/argo-workflows/v3/workflow/common"
)

type WorkflowSuite struct {
Expand Down Expand Up @@ -85,6 +89,105 @@ spec:
})
}

func (s *WorkflowSuite) TestWorkflowFailedWhenAllPodSetFailedFromPending() {
(s.Given().Workflow(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: active-deadline-fanout-template-level-
namespace: argo
spec:
entrypoint: entrypoint
templates:
- name: entrypoint
steps:
- - name: fanout
template: echo
arguments:
parameters:
- name: item
value: "{{item}}"
withItems:
- 1
- 2
- 3
- 4
- name: echo
inputs:
parameters:
- name: item
container:
image: centos:latest
imagePullPolicy: Always
command:
- sh
- '-c'
args:
- echo
- 'workflow number {{inputs.parameters.item}}'
- sleep
- '20'
activeDeadlineSeconds: 2 # defined on template level, not workflow level !
`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeFailed, time.Minute*11).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowFailed, status.Phase)
for _, node := range status.Nodes {
if node.Type == wfv1.NodeTypePod {
assert.Equal(t, wfv1.NodeFailed, node.Phase)
assert.Contains(t, node.Message, "Pod was active on the node longer than the specified deadline")
}
}
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "fanout(0:1)")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
for _, c := range pod.Status.ContainerStatuses {
if c.Name == common.WaitContainerName && c.State.Terminated == nil {
assert.NotNil(t, c.State.Waiting)
assert.Contains(t, c.State.Waiting.Reason, "PodInitializing")
assert.Nil(t, c.State.Running)
}
}
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "fanout(1:2)")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
for _, c := range pod.Status.ContainerStatuses {
if c.Name == common.WaitContainerName && c.State.Terminated == nil {
assert.NotNil(t, c.State.Waiting)
assert.Contains(t, c.State.Waiting.Reason, "PodInitializing")
assert.Nil(t, c.State.Running)
}
}
})).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "fanout(2:3)")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
for _, c := range pod.Status.ContainerStatuses {
if c.Name == common.WaitContainerName && c.State.Terminated == nil {
assert.NotNil(t, c.State.Waiting)
assert.Contains(t, c.State.Waiting.Reason, "PodInitializing")
assert.Nil(t, c.State.Running)
}
}
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "fanout(3:4)")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
for _, c := range pod.Status.ContainerStatuses {
if c.Name == common.WaitContainerName && c.State.Terminated == nil {
assert.NotNil(t, c.State.Waiting)
assert.Contains(t, c.State.Waiting.Reason, "PodInitializing")
assert.Nil(t, c.State.Running)
}
}
})
}

func TestWorkflowSuite(t *testing.T) {
suite.Run(t, new(WorkflowSuite))
}
7 changes: 2 additions & 5 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1434,21 +1434,18 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus
new.Outputs.ExitCode = pointer.StringPtr(fmt.Sprint(*exitCode))
}

// If the init container failed, we should mark the node as failed.
var initContainerFailed bool
for _, c := range pod.Status.InitContainerStatuses {
if c.State.Terminated != nil && int(c.State.Terminated.ExitCode) != 0 {
new.Phase = wfv1.NodeFailed
initContainerFailed = true
woc.log.WithField("new.phase", new.Phase).Info("marking node as failed since init container has non-zero exit code")
break
}
}

// We cannot fail the node until the wait container is finished (unless any init container has failed) because it may be busy saving outputs, and these
// We cannot fail the node if the wait container is still running because it may be busy saving outputs, and these
// would not get captured successfully.
for _, c := range pod.Status.ContainerStatuses {
if (c.Name == common.WaitContainerName && c.State.Terminated == nil && new.Phase.Completed()) && !initContainerFailed {
if c.Name == common.WaitContainerName && c.State.Running != nil && new.Phase.Completed() {
woc.log.WithField("new.phase", new.Phase).Info("leaving phase un-changed: wait container is not yet terminated ")
new.Phase = old.Phase
}
Expand Down
26 changes: 26 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1587,6 +1587,32 @@ func TestAssessNodeStatus(t *testing.T) {
},
node: &wfv1.NodeStatus{TemplateName: templateName},
want: wfv1.NodeFailed,
}, {
name: "pod failed - wait container waiting but pod was set failed",
pod: &apiv1.Pod{
Status: apiv1.PodStatus{
InitContainerStatuses: []apiv1.ContainerStatus{
{
Name: common.InitContainerName,
State: apiv1.ContainerState{Terminated: &apiv1.ContainerStateTerminated{ExitCode: 0}},
},
},
ContainerStatuses: []apiv1.ContainerStatus{
{
Name: common.WaitContainerName,
State: apiv1.ContainerState{Terminated: nil, Waiting: &apiv1.ContainerStateWaiting{Reason: "PodInitializing"}},
},
{
Name: common.MainContainerName,
State: apiv1.ContainerState{Terminated: nil},
},
},
Message: "failed since wait contain waiting",
Phase: apiv1.PodFailed,
},
},
node: &wfv1.NodeStatus{TemplateName: templateName},
want: wfv1.NodeFailed,
}, {
name: "pod running",
pod: &apiv1.Pod{
Expand Down

0 comments on commit 16cfef9

Please sign in to comment.