Skip to content

Commit

Permalink
fix(containerSet): mark container deleted when pod deleted. Fixes: ar…
Browse files Browse the repository at this point in the history
  • Loading branch information
shuangkun authored Mar 25, 2024
1 parent 842c613 commit cfe2bb7
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 1 deletion.
13 changes: 12 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1223,7 +1223,18 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) (error, bool)
node.Daemoned = nil
woc.updated = true
}
woc.markNodePhase(node.Name, wfv1.NodeError, "pod deleted")
woc.markNodeError(node.Name, errors.New("", "pod deleted"))
// Set pod's child(container) error if pod deleted
for _, childNodeID := range node.Children {
childNode, err := woc.wf.Status.Nodes.Get(childNodeID)
if err != nil {
woc.log.Errorf("was unable to obtain node for %s", childNodeID)
continue
}
if childNode.Type == wfv1.NodeTypeContainer {
woc.markNodeError(childNode.Name, errors.New("", "container deleted"))
}
}
}
}
return nil, !taskResultIncomplete
Expand Down
89 changes: 89 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10974,3 +10974,92 @@ status:
woc.operate(ctx)
assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
}

var wfHasContainerSet = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: lovely-rhino
spec:
templates:
- name: init
dag:
tasks:
- name: A
template: run
arguments: {}
- name: run
containerSet:
containers:
- name: main
image: alpine:latest
command:
- /bin/sh
args:
- '-c'
- sleep 9000
resources: {}
- name: main2
image: alpine:latest
command:
- /bin/sh
args:
- '-c'
- sleep 9000
resources: {}
entrypoint: init
arguments: {}
ttlStrategy:
secondsAfterCompletion: 300
podGC:
strategy: OnPodCompletion`

// TestContainerSetDeleteContainerWhenPodDeleted test whether a workflow has ContainerSet error when pod deleted.
func TestContainerSetDeleteContainerWhenPodDeleted(t *testing.T) {
cancel, controller := newController()
defer cancel()
ctx := context.Background()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")
wf := wfv1.MustUnmarshalWorkflow(wfHasContainerSet)
wf, err := wfcset.Create(ctx, wf, metav1.CreateOptions{})
assert.Nil(t, err)
wf, err = wfcset.Get(ctx, wf.ObjectMeta.Name, metav1.GetOptions{})
assert.Nil(t, err)
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
pods, err := listPods(woc)
assert.Nil(t, err)
assert.Equal(t, 1, len(pods.Items))

// mark pod Running
makePodsPhase(ctx, woc, apiv1.PodRunning)
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
for _, node := range woc.wf.Status.Nodes {
if node.Type == wfv1.NodeTypePod {
assert.Equal(t, wfv1.NodeRunning, node.Phase)
}
}

// TODO: Refactor to use local-scoped env vars in test to avoid long wait. See https://github.com/argoproj/argo-workflows/pull/12756#discussion_r1530245007
// delete pod
time.Sleep(10 * time.Second)
deletePods(ctx, woc)
pods, err = listPods(woc)
assert.Nil(t, err)
assert.Equal(t, 0, len(pods.Items))

// reconcile
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
assert.Equal(t, wfv1.WorkflowError, woc.wf.Status.Phase)
for _, node := range woc.wf.Status.Nodes {
assert.Equal(t, wfv1.NodeError, node.Phase)
if node.Type == wfv1.NodeTypePod {
assert.Equal(t, "pod deleted", node.Message)
}
if node.Type == wfv1.NodeTypeContainer {
assert.Equal(t, "container deleted", node.Message)
}
}
}

0 comments on commit cfe2bb7

Please sign in to comment.