Skip to content

Commit

Permalink
fix: return itself when getOutboundNodes from memoization Hit steps/D…
Browse files Browse the repository at this point in the history
…AG. Fixes: argoproj#7873 (argoproj#12780)

Signed-off-by: shuangkun <[email protected]>
Co-authored-by: zjgemi <[email protected]>
Co-authored-by: sherwinkoo29 <[email protected]>
Co-authored-by: Isitha Subasinghe <[email protected]>
  • Loading branch information
4 people authored Mar 22, 2024
1 parent dbff027 commit a719d94
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 0 deletions.
4 changes: 4 additions & 0 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2838,6 +2838,10 @@ func (woc *wfOperationCtx) getOutboundNodes(nodeID string) []string {
if numChildren > 0 {
return []string{node.Children[numChildren-1]}
}
case wfv1.NodeTypeSteps, wfv1.NodeTypeDAG:
if node.MemoizationStatus != nil && node.MemoizationStatus.Hit {
return []string{node.ID}
}
}
outbound := make([]string, 0)
for _, outboundNodeID := range node.OutboundNodes {
Expand Down
171 changes: 171 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5604,6 +5604,177 @@ func TestConfigMapCacheLoadOperateNoOutputs(t *testing.T) {
}
}

var workflowWithMemoizedInSteps = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: memoized-bug-
namespace: default
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: hello-steps
template: memoized
- - name: whatever
template: hello
- name: memoized
outputs:
parameters:
- name: msg
valueFrom:
parameter: "{{steps.hello-step.outputs.result}}"
steps:
- - name: hello-step
template: hello
memoize:
key: "memoized-bug-steps-0"
cache:
configMap:
name: my-config
- name: hello
container:
image: alpine:latest
command: [sh, -c]
args: ["echo Hello"]
`

var workflowWithMemoizedInDAG = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: memoized-bug-
namespace: default
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: hello-dag
template: memoized
- - name: whatever
template: hello
- name: memoized
outputs:
parameters:
- name: msg
valueFrom:
parameter: "{{dag.hello-dag.outputs.result}}"
dag:
tasks:
- name: hello-dag
template: hello
memoize:
key: "memoized-bug-dag-0"
cache:
configMap:
name: my-config
- name: hello
container:
image: alpine:latest
command: [sh, -c]
args: ["echo Hello"]
`

func TestGetOutboundNodesFromCacheHitSteps(t *testing.T) {
myConfigMapCacheEntry := apiv1.ConfigMap{
Data: map[string]string{
"memoized-bug-steps-0": `{"nodeID":"memoized-bug-wqbj4-3475368823","outputs":null,"creationTimestamp":"2020-09-21T18:12:56Z","lastHitTimestamp":"2024-03-11T05:59:58Z"}`,
},
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "my-config",
ResourceVersion: "80004",
Labels: map[string]string{
common.LabelKeyConfigMapType: common.LabelValueTypeConfigMapCache,
},
},
}

wf := wfv1.MustUnmarshalWorkflow(workflowWithMemoizedInSteps)
cancel, controller := newController()
defer cancel()

ctx := context.Background()
_, err := controller.wfclientset.ArgoprojV1alpha1().Workflows(wf.ObjectMeta.Namespace).Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)
_, err = controller.kubeclientset.CoreV1().ConfigMaps("default").Create(ctx, &myConfigMapCacheEntry, metav1.CreateOptions{})
assert.NoError(t, err)

woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc.operate(ctx)

assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)

hitCache := 0
for _, node := range woc.wf.Status.Nodes {
if node.DisplayName == "hello-steps" {
hitCache++
assert.NotNil(t, node.MemoizationStatus)
assert.True(t, node.MemoizationStatus.Hit)
assert.Equal(t, 1, len(node.Children))
}
}
assert.Equal(t, 1, hitCache)
}

func TestGetOutboundNodesFromCacheHitDAG(t *testing.T) {
myConfigMapCacheEntry := apiv1.ConfigMap{
Data: map[string]string{
"memoized-bug-dag-0": `{"nodeID":"memoized-bug-wqbj4-3475368823","outputs":null,"creationTimestamp":"2020-09-21T18:12:56Z","lastHitTimestamp":"2024-03-11T05:59:58Z"}`,
},
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "my-config",
ResourceVersion: "80004",
Labels: map[string]string{
common.LabelKeyConfigMapType: common.LabelValueTypeConfigMapCache,
},
},
}

wf := wfv1.MustUnmarshalWorkflow(workflowWithMemoizedInDAG)
cancel, controller := newController()
defer cancel()

ctx := context.Background()
_, err := controller.wfclientset.ArgoprojV1alpha1().Workflows(wf.ObjectMeta.Namespace).Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)
_, err = controller.kubeclientset.CoreV1().ConfigMaps("default").Create(ctx, &myConfigMapCacheEntry, metav1.CreateOptions{})
assert.NoError(t, err)

woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc.operate(ctx)

assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)

hitCache := 0
for _, node := range woc.wf.Status.Nodes {
if node.DisplayName == "hello-dag" {
hitCache++
assert.NotNil(t, node.MemoizationStatus)
assert.True(t, node.MemoizationStatus.Hit)
assert.Equal(t, 1, len(node.Children))
}
}
assert.Equal(t, 1, hitCache)
}

var workflowCachedMaxAge = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down

0 comments on commit a719d94

Please sign in to comment.