Skip to content

Commit

Permalink
feat: Set a max recursion depth limit (#11646)
Browse files Browse the repository at this point in the history
  • Loading branch information
Joibel authored Sep 5, 2023
1 parent 48697a1 commit 633c5e9
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 4 deletions.
1 change: 1 addition & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ most users. Environment variables may be removed at any time.
| `CACHE_GC_AFTER_NOT_HIT_DURATION` | `time.Duration` | `30s` | When a memoization cache has not been hit after this duration, it will be deleted. |
| `CRON_SYNC_PERIOD` | `time.Duration` | `10s` | How often to sync cron workflows. |
| `DEFAULT_REQUEUE_TIME` | `time.Duration` | `10s` | The re-queue time for the rate limiter of the workflow queue. |
| `DISABLE_MAX_RECURSION` | `bool` | `false` | Set to true to disable the recursion preventer, which will stop a workflow running which has called into a child template 100 times |
| `EXPRESSION_TEMPLATES` | `bool` | `true` | Escape hatch to disable expression templates. |
| `EVENT_AGGREGATION_WITH_ANNOTATIONS` | `bool` | `false` | Whether event annotations will be used when aggregating events. |
| `GRPC_MESSAGE_SIZE` | `string` | Use different GRPC Max message size for Argo server deployment (supporting huge workflows). |
Expand Down
8 changes: 7 additions & 1 deletion docs/scaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Within a cluster can use instance ID to run N Argo instances within a cluster.

Create one namespace for each Argo, e.g. `argo-i1`, `argo-i2`:.

Edit [workflow-controller-configmap.yaml](workflow-controller-configmap.yaml) for each namespace to set an instance ID.
Edit [`workflow-controller-configmap.yaml`](workflow-controller-configmap.yaml) for each namespace to set an instance ID.

```yaml
apiVersion: v1
Expand All @@ -83,6 +83,12 @@ argo --instanceid i1 submit my-wf.yaml

You do not need to have one instance ID per namespace, you could have many or few.

### Maximum Recursion Depth

In order to protect users against infinite recursion, the controller has a default maximum recursion depth of 100 calls to templates.

This protection can be disabled with the [environment variable](environment-variables.md#controller) `DISABLE_MAX_RECURSION=true`

## Miscellaneous

See also [Running At Massive Scale](running-at-massive-scale.md).
3 changes: 3 additions & 0 deletions util/help/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ const (

WorkflowTemplates = root + "/workflow-templates/"
WorkflowTemplatesReferencingOtherTemplates = WorkflowTemplates + "#referencing-other-workflowtemplates"

Scaling = root + "/scaling.md"
ConfigureMaximumRecursionDepth = Scaling + "#maximum-recursion-depth"
)
1 change: 1 addition & 0 deletions workflow/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (wfc *WorkflowController) updateConfig() error {
wfc.hydrator = hydrator.New(wfc.offloadNodeStatusRepo)
wfc.updateEstimatorFactory()
wfc.rateLimiter = wfc.newRateLimiter()
wfc.maxStackDepth = wfc.getMaxStackDepth()

log.WithField("executorImage", wfc.executorImage()).
WithField("executorImagePullPolicy", wfc.executorImagePullPolicy()).
Expand Down
11 changes: 11 additions & 0 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ import (
plugin "github.com/argoproj/argo-workflows/v3/workflow/util/plugins"
)

const maxAllowedStackDepth = 100

// WorkflowController is the controller for workflow resources
type WorkflowController struct {
// namespace of the workflow controller
Expand Down Expand Up @@ -98,6 +100,10 @@ type WorkflowController struct {
dynamicInterface dynamic.Interface
wfclientset wfclientset.Interface

// maxStackDepth is a configurable limit to the depth of the "stack", which is increased with every nested call to
// woc.executeTemplate and decreased when such calls return. This is used to prevent infinite recursion
maxStackDepth int

// datastructures to support the processing of workflows and workflow pods
wfInformer cache.SharedIndexInformer
wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer
Expand Down Expand Up @@ -189,6 +195,7 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli
}

wfc.UpdateConfig(ctx)
wfc.maxStackDepth = wfc.getMaxStackDepth()
wfc.metrics = metrics.New(wfc.getMetricsServerConfig())
wfc.entrypoint = entrypoint.New(kubeclientset, wfc.Config.Images)

Expand Down Expand Up @@ -1129,6 +1136,10 @@ func (wfc *WorkflowController) GetManagedNamespace() string {
return wfc.Config.Namespace
}

func (wfc *WorkflowController) getMaxStackDepth() int {
return maxAllowedStackDepth
}

func (wfc *WorkflowController) getMetricsServerConfig() (metrics.ServerConfig, metrics.ServerConfig) {
// Metrics config
path := wfc.Config.MetricsConfig.Path
Expand Down
1 change: 1 addition & 0 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl
cacheFactory: controllercache.NewCacheFactory(kube, "default"),
progressPatchTickDuration: envutil.LookupEnvDurationOr(common.EnvVarProgressPatchTickDuration, 1*time.Minute),
progressFileTickDuration: envutil.LookupEnvDurationOr(common.EnvVarProgressFileTickDuration, 3*time.Second),
maxStackDepth: maxAllowedStackDepth,
}

for _, opt := range options {
Expand Down
1 change: 1 addition & 0 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
case ErrDeadlineExceeded:
return
case ErrParallelismReached:
case ErrMaxDepthExceeded:
case ErrTimeout:
_ = woc.markNodePhase(taskNodeName, wfv1.NodeFailed, err.Error())
return
Expand Down
16 changes: 15 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
errorsutil "github.com/argoproj/argo-workflows/v3/util/errors"
"github.com/argoproj/argo-workflows/v3/util/expr/argoexpr"
"github.com/argoproj/argo-workflows/v3/util/expr/env"
"github.com/argoproj/argo-workflows/v3/util/help"
"github.com/argoproj/argo-workflows/v3/util/intstr"
"github.com/argoproj/argo-workflows/v3/util/resource"
"github.com/argoproj/argo-workflows/v3/util/retry"
Expand Down Expand Up @@ -103,14 +104,17 @@ type wfOperationCtx struct {
// preExecutionNodePhases contains the phases of all the nodes before the current operation. Necessary to infer
// changes in phase for metric emission
preExecutionNodePhases map[string]wfv1.NodePhase

// execWf holds the Workflow for use in execution.
// In Normal workflow scenario: It holds copy of workflow object
// In Submit From WorkflowTemplate: It holds merged workflow with WorkflowDefault, Workflow and WorkflowTemplate
// 'execWf.Spec' should usually be used instead `wf.Spec`
execWf *wfv1.Workflow

taskSet map[string]wfv1.Template

// currentStackDepth tracks the depth of the "stack", increased with every nested call to executeTemplate and decreased
// when such calls return. This is used to prevent infinite recursion
currentStackDepth int
}

var (
Expand All @@ -121,6 +125,8 @@ var (
ErrResourceRateLimitReached = errors.New(errors.CodeForbidden, "resource creation rate-limit reached")
// ErrTimeout indicates a specific template timed out
ErrTimeout = errors.New(errors.CodeTimeout, "timeout")
// ErrMaxDepthExceeded indicates that the maximum recursion depth was exceeded
ErrMaxDepthExceeded = errors.New(errors.CodeTimeout, fmt.Sprintf("Maximum recursion depth exceeded. See %s", help.ConfigureMaximumRecursionDepth))
)

// maxOperationTime is the maximum time a workflow operation is allowed to run
Expand Down Expand Up @@ -162,6 +168,7 @@ func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOper
eventRecorder: wfc.eventRecorderManager.Get(wf.Namespace),
preExecutionNodePhases: make(map[string]wfv1.NodePhase),
taskSet: make(map[string]wfv1.Template),
currentStackDepth: 0,
}

if woc.wf.Status.Nodes == nil {
Expand Down Expand Up @@ -1776,6 +1783,13 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
woc.log.Warnf("Node was nil, will be initialized as type Skipped")
}

woc.currentStackDepth++
defer func() { woc.currentStackDepth-- }()

if woc.currentStackDepth >= woc.controller.maxStackDepth && os.Getenv("DISABLE_MAX_RECURSION") != "true" {
return woc.initializeNodeOrMarkError(node, nodeName, tmplCtx.GetTemplateScope(), orgTmpl, opts.boundaryID, ErrMaxDepthExceeded), ErrMaxDepthExceeded
}

newTmplCtx, resolvedTmpl, templateStored, err := tmplCtx.ResolveTemplate(orgTmpl)
if err != nil {
return woc.initializeNodeOrMarkError(node, nodeName, templateScope, orgTmpl, opts.boundaryID, err), err
Expand Down
113 changes: 112 additions & 1 deletion workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9428,7 +9428,7 @@ spec:
valueFrom:
expression: |
'https://argo-workflows.company.com/workflows/namepace/' + '{{workflow.name}}' + '?tab=workflow'
- name: whalesay
container:
image: docker/whalesay:latest
Expand Down Expand Up @@ -9517,3 +9517,114 @@ func TestMemoizationTemplateLevelCacheWithDagWithCache(t *testing.T) {
}
}
}

var maxDepth = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: hello-world
spec:
entrypoint: diamond
templates:
- name: diamond
dag:
tasks:
- name: A
template: echo
arguments:
parameters: [{name: message, value: A}]
- name: B
dependencies: [A]
template: echo
arguments:
parameters: [{name: message, value: B}]
- name: C
dependencies: [A]
template: echo
arguments:
parameters: [{name: message, value: C}]
- name: D
dependencies: [B, C]
template: echo
arguments:
parameters: [{name: message, value: D}]
- name: echo
inputs:
parameters:
- name: message
container:
image: alpine:3.7
command: [echo, "{{inputs.parameters.message}}"]
`

func TestMaxDepth(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(maxDepth)
cancel, controller := newController(wf)
defer cancel()

// Max depth is too small, error expected
controller.maxStackDepth = 2
ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)

woc.operate(ctx)

assert.Equal(t, wfv1.WorkflowError, woc.wf.Status.Phase)
node := woc.wf.Status.Nodes["hello-world-713168755"]
if assert.NotNil(t, node) {
assert.Equal(t, wfv1.NodeError, node.Phase)
assert.Contains(t, node.Message, "Maximum recursion depth exceeded")
}

// Max depth is enabled, but not too small, no error expected
controller.maxStackDepth = 3
woc = newWorkflowOperationCtx(wf, controller)

woc.operate(ctx)

assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
node = woc.wf.Status.Nodes["hello-world-713168755"]
if assert.NotNil(t, node) {
assert.Equal(t, wfv1.NodePending, node.Phase)
}

makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc.operate(ctx)
assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
}

func TestMaxDepthEnvVariable(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(maxDepth)
cancel, controller := newController(wf)
defer cancel()

// Max depth is disabled, no error expected
controller.maxStackDepth = 2
ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
_ = os.Setenv("DISABLE_MAX_RECURSION", "true")

woc.operate(ctx)

assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
node := woc.wf.Status.Nodes["hello-world-713168755"]
if assert.NotNil(t, node) {
assert.Equal(t, wfv1.NodePending, node.Phase)
}

makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc.operate(ctx)
assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)

_ = os.Unsetenv("DISABLE_MAX_RECURSION")
}
3 changes: 2 additions & 1 deletion workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,9 @@ func (woc *wfOperationCtx) executeStepGroup(ctx context.Context, stepGroup []wfv
case ErrDeadlineExceeded:
return node, nil
case ErrParallelismReached:
case ErrMaxDepthExceeded:
case ErrTimeout:
return woc.markNodePhase(node.Name, wfv1.NodeFailed, fmt.Sprintf("child '%s' timedout", childNodeName)), nil
return woc.markNodePhase(node.Name, wfv1.NodeFailed, err.Error()), nil
default:
woc.addChildNode(sgNodeName, childNodeName)
return woc.markNodeError(node.Name, fmt.Errorf("step group deemed errored due to child %s error: %w", childNodeName, err)), nil
Expand Down

0 comments on commit 633c5e9

Please sign in to comment.