diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 9a830aed186b..4d8af7777ccd 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -2838,6 +2838,10 @@ func (woc *wfOperationCtx) getOutboundNodes(nodeID string) []string { if numChildren > 0 { return []string{node.Children[numChildren-1]} } + case wfv1.NodeTypeSteps, wfv1.NodeTypeDAG: + if node.MemoizationStatus != nil && node.MemoizationStatus.Hit { + return []string{node.ID} + } } outbound := make([]string, 0) for _, outboundNodeID := range node.OutboundNodes { diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index b0a1ec3d8f62..c9c7227322fc 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -5604,6 +5604,177 @@ func TestConfigMapCacheLoadOperateNoOutputs(t *testing.T) { } } +var workflowWithMemoizedInSteps = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: memoized-bug- + namespace: default +spec: + entrypoint: main + templates: + - name: main + steps: + - - name: hello-steps + template: memoized + - - name: whatever + template: hello + + - name: memoized + outputs: + parameters: + - name: msg + valueFrom: + parameter: "{{steps.hello-step.outputs.result}}" + steps: + - - name: hello-step + template: hello + memoize: + key: "memoized-bug-steps-0" + cache: + configMap: + name: my-config + + - name: hello + container: + image: alpine:latest + command: [sh, -c] + args: ["echo Hello"] +` + +var workflowWithMemoizedInDAG = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: memoized-bug- + namespace: default +spec: + entrypoint: main + templates: + - name: main + steps: + - - name: hello-dag + template: memoized + - - name: whatever + template: hello + + - name: memoized + outputs: + parameters: + - name: msg + valueFrom: + parameter: "{{dag.hello-dag.outputs.result}}" + dag: + tasks: + - name: hello-dag + template: hello + memoize: + key: "memoized-bug-dag-0" + cache: + configMap: + name: my-config + + - name: hello + container: + image: alpine:latest + command: [sh, -c] + args: ["echo Hello"] +` + +func TestGetOutboundNodesFromCacheHitSteps(t *testing.T) { + myConfigMapCacheEntry := apiv1.ConfigMap{ + Data: map[string]string{ + "memoized-bug-steps-0": `{"nodeID":"memoized-bug-wqbj4-3475368823","outputs":null,"creationTimestamp":"2020-09-21T18:12:56Z","lastHitTimestamp":"2024-03-11T05:59:58Z"}`, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "my-config", + ResourceVersion: "80004", + Labels: map[string]string{ + common.LabelKeyConfigMapType: common.LabelValueTypeConfigMapCache, + }, + }, + } + + wf := wfv1.MustUnmarshalWorkflow(workflowWithMemoizedInSteps) + cancel, controller := newController() + defer cancel() + + ctx := context.Background() + _, err := controller.wfclientset.ArgoprojV1alpha1().Workflows(wf.ObjectMeta.Namespace).Create(ctx, wf, metav1.CreateOptions{}) + assert.NoError(t, err) + _, err = controller.kubeclientset.CoreV1().ConfigMaps("default").Create(ctx, &myConfigMapCacheEntry, metav1.CreateOptions{}) + assert.NoError(t, err) + + woc := newWorkflowOperationCtx(wf, controller) + woc.operate(ctx) + makePodsPhase(ctx, woc, apiv1.PodSucceeded) + woc.operate(ctx) + + assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase) + + hitCache := 0 + for _, node := range woc.wf.Status.Nodes { + if node.DisplayName == "hello-steps" { + hitCache++ + assert.NotNil(t, node.MemoizationStatus) + assert.True(t, node.MemoizationStatus.Hit) + assert.Equal(t, 1, len(node.Children)) + } + } + assert.Equal(t, 1, hitCache) +} + +func TestGetOutboundNodesFromCacheHitDAG(t *testing.T) { + myConfigMapCacheEntry := apiv1.ConfigMap{ + Data: map[string]string{ + "memoized-bug-dag-0": `{"nodeID":"memoized-bug-wqbj4-3475368823","outputs":null,"creationTimestamp":"2020-09-21T18:12:56Z","lastHitTimestamp":"2024-03-11T05:59:58Z"}`, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "my-config", + ResourceVersion: "80004", + Labels: map[string]string{ + common.LabelKeyConfigMapType: common.LabelValueTypeConfigMapCache, + }, + }, + } + + wf := wfv1.MustUnmarshalWorkflow(workflowWithMemoizedInDAG) + cancel, controller := newController() + defer cancel() + + ctx := context.Background() + _, err := controller.wfclientset.ArgoprojV1alpha1().Workflows(wf.ObjectMeta.Namespace).Create(ctx, wf, metav1.CreateOptions{}) + assert.NoError(t, err) + _, err = controller.kubeclientset.CoreV1().ConfigMaps("default").Create(ctx, &myConfigMapCacheEntry, metav1.CreateOptions{}) + assert.NoError(t, err) + + woc := newWorkflowOperationCtx(wf, controller) + woc.operate(ctx) + makePodsPhase(ctx, woc, apiv1.PodSucceeded) + woc.operate(ctx) + + assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase) + + hitCache := 0 + for _, node := range woc.wf.Status.Nodes { + if node.DisplayName == "hello-dag" { + hitCache++ + assert.NotNil(t, node.MemoizationStatus) + assert.True(t, node.MemoizationStatus.Hit) + assert.Equal(t, 1, len(node.Children)) + } + } + assert.Equal(t, 1, hitCache) +} + var workflowCachedMaxAge = ` apiVersion: argoproj.io/v1alpha1 kind: Workflow