Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Adding flags for ignore-retry-cause and default-max-attempts #596

Closed
wants to merge 13 commits into from
Closed
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,5 @@ require (
)

replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d

replace github.com/flyteorg/flyteplugins => github.com/flyteorg/flyteplugins v1.1.26-0.20230914220605-c548d23fbb52
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,8 @@ github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYF
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.5.13 h1:IQ2Cw+u36ew3BPyRDAcHdzc/GyNEOXOxhKy9jbS4hbo=
github.com/flyteorg/flyteidl v1.5.13/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteplugins v1.1.23 h1:N526EeVHQwdMTiQ25Qa7F4y1izQeqP4rU0+Ogv6M6Kw=
github.com/flyteorg/flyteplugins v1.1.23/go.mod h1:HEd4yf0H8XfxMcHFwrTdTIJ/9lEAz83OpgcFQe47L6I=
github.com/flyteorg/flyteplugins v1.1.26-0.20230914220605-c548d23fbb52 h1:Vnye5/uCC03Qz/z6OYx8VIL9akQvwwWrrKL7rV8gEmM=
github.com/flyteorg/flyteplugins v1.1.26-0.20230914220605-c548d23fbb52/go.mod h1:HEd4yf0H8XfxMcHFwrTdTIJ/9lEAz83OpgcFQe47L6I=
github.com/flyteorg/flytestdlib v1.0.22 h1:8RAc+TmME54FInf4+t6+C7X8Z/dW6i6aTs6W8SEzpI8=
github.com/flyteorg/flytestdlib v1.0.22/go.mod h1:6nXa5g00qFIsgdvQ7jKQMJmDniqO0hG6Z5X5olfduqQ=
github.com/flyteorg/stow v0.3.7 h1:Cx7j8/Ux6+toD5hp5fy++927V+yAcAttDeQAlUD/864=
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ var (
},
NodeConfig: NodeConfig{
MaxNodeRetriesOnSystemFailures: 3,
InterruptibleFailureThreshold: 1,
InterruptibleFailureThreshold: -1,
DefaultMaxAttempts: 1,
IgnoreRetryCause: false,
},
Expand Down Expand Up @@ -205,7 +205,7 @@ type WorkqueueConfig struct {
type NodeConfig struct {
DefaultDeadlines DefaultDeadlines `json:"default-deadlines,omitempty" pflag:",Default value for timeouts"`
MaxNodeRetriesOnSystemFailures int64 `json:"max-node-retries-system-failures" pflag:"2,Maximum number of retries per node for node failure due to infra issues"`
InterruptibleFailureThreshold int64 `json:"interruptible-failure-threshold" pflag:"1,number of failures for a node to be still considered interruptible'"`
InterruptibleFailureThreshold int32 `json:"interruptible-failure-threshold" pflag:"1,number of failures for a node to be still considered interruptible. Negative numbers are treated as complementary (ex. -1 means last attempt is non-interruptible).'"`
DefaultMaxAttempts int32 `json:"default-max-attempts" pflag:"3,Default maximum number of attempts for a node"`
IgnoreRetryCause bool `json:"ignore-retry-cause" pflag:",Ignore retry cause and count all attempts toward a node's max attempts"`
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/controller/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ type nodeExecutor struct {
defaultExecutionDeadline time.Duration
enqueueWorkflow v1alpha1.EnqueueWorkflow
eventConfig *config.EventConfig
interruptibleFailureThreshold uint32
interruptibleFailureThreshold int32
maxDatasetSizeBytes int64
maxNodeRetriesForSystemFailures uint32
metrics *nodeMetrics
Expand Down Expand Up @@ -1442,7 +1442,7 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora
defaultExecutionDeadline: nodeConfig.DefaultDeadlines.DefaultNodeExecutionDeadline.Duration,
enqueueWorkflow: enQWorkflow,
eventConfig: eventConfig,
interruptibleFailureThreshold: uint32(nodeConfig.InterruptibleFailureThreshold),
interruptibleFailureThreshold: nodeConfig.InterruptibleFailureThreshold,
maxDatasetSizeBytes: maxDatasetSize,
maxNodeRetriesForSystemFailures: uint32(nodeConfig.MaxNodeRetriesOnSystemFailures),
metrics: metrics,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodes/interfaces/node_exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type NodeExecutionMetadata interface {
GetK8sServiceAccount() string
GetSecurityContext() core.SecurityContext
IsInterruptible() bool
GetInterruptibleFailureThreshold() uint32
GetInterruptibleFailureThreshold() int32
}

type NodeExecutionContext interface {
Expand Down
18 changes: 10 additions & 8 deletions pkg/controller/nodes/node_exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type nodeExecMetadata struct {
v1alpha1.Meta
nodeExecID *core.NodeExecutionIdentifier
interruptible bool
interruptibleFailureThreshold uint32
interruptibleFailureThreshold int32
nodeLabels map[string]string
}

Expand All @@ -114,7 +114,7 @@ func (e nodeExecMetadata) IsInterruptible() bool {
return e.interruptible
}

func (e nodeExecMetadata) GetInterruptibleFailureThreshold() uint32 {
func (e nodeExecMetadata) GetInterruptibleFailureThreshold() int32 {
return e.interruptibleFailureThreshold
}

Expand Down Expand Up @@ -208,7 +208,7 @@ func (e nodeExecContext) MaxDatasetSizeBytes() int64 {
}

func newNodeExecContext(_ context.Context, store *storage.DataStore, execContext executors.ExecutionContext, nl executors.NodeLookup,
node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, interruptible bool, interruptibleFailureThreshold uint32,
node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, interruptible bool, interruptibleFailureThreshold int32,
maxDatasetSize int64, taskEventRecorder events.TaskEventRecorder, nodeEventRecorder events.NodeEventRecorder, tr interfaces.TaskReader, nsm *nodeStateManager,
enqueueOwner func() error, rawOutputPrefix storage.DataReference, outputShardSelector ioutils.ShardSelector) *nodeExecContext {

Expand Down Expand Up @@ -289,19 +289,21 @@ func (c *nodeExecutor) BuildNodeExecutionContext(ctx context.Context, executionC
if config.GetConfig().NodeConfig.IgnoreRetryCause {
// For the unified retry behavior we execute the last interruptibleFailureThreshold attempts on a non
// interruptible machine
currentAttempt := s.GetAttempts() + 1 + s.GetSystemFailures()
maxAttempts := uint32(config.GetConfig().NodeConfig.DefaultMaxAttempts)
currentAttempt := int32(s.GetAttempts() + s.GetSystemFailures())
maxAttempts := int32(config.GetConfig().NodeConfig.DefaultMaxAttempts)
if n.GetRetryStrategy() != nil && n.GetRetryStrategy().MinAttempts != nil && *n.GetRetryStrategy().MinAttempts != 0 {
maxAttempts = uint32(*n.GetRetryStrategy().MinAttempts)
maxAttempts = int32(*n.GetRetryStrategy().MinAttempts)
}

if interruptible && currentAttempt >= maxAttempts-c.interruptibleFailureThreshold {
if interruptible && ((c.interruptibleFailureThreshold > 0 && currentAttempt >= c.interruptibleFailureThreshold) ||
hamersaw marked this conversation as resolved.
Show resolved Hide resolved
(c.interruptibleFailureThreshold <= 0 && currentAttempt >= maxAttempts+c.interruptibleFailureThreshold)) {
interruptible = false
c.metrics.InterruptedThresholdHit.Inc(ctx)
}
} else {
// Else a node is not considered interruptible if the system failures have exceeded the configured threshold
if interruptible && s.GetSystemFailures() >= c.interruptibleFailureThreshold {
if interruptible && ((c.interruptibleFailureThreshold > 0 && int32(s.GetSystemFailures()) >= c.interruptibleFailureThreshold) ||
(c.interruptibleFailureThreshold <= 0 && int32(s.GetSystemFailures()) >= int32(c.maxNodeRetriesForSystemFailures)+c.interruptibleFailureThreshold)) {
interruptible = false
c.metrics.InterruptedThresholdHit.Inc(ctx)
}
Expand Down
61 changes: 33 additions & 28 deletions pkg/controller/nodes/node_exec_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,43 +378,48 @@ func Test_NodeContext_RecordTaskEvent(t1 *testing.T) {

func Test_NodeContext_IsInterruptible(t *testing.T) {
ctx := context.Background()
scope := promutils.NewTestScope()

dataStore, _ := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, scope.NewSubScope("dataStore"))
nodeExecutor := nodeExecutor{
interruptibleFailureThreshold: 1, // interruptibleFailureThreshold is hardcoded to 1
maxDatasetSizeBytes: 0,
defaultDataSandbox: "s3://bucket-a",
store: dataStore,
shardSelector: ioutils.NewConstantShardSelector([]string{"x"}),
enqueueWorkflow: func(workflowID v1alpha1.WorkflowID) {},
metrics: &nodeMetrics{
InterruptibleNodesRunning: labeled.NewCounter("running", "xyz", scope.NewSubScope("interruptible1")),
InterruptibleNodesTerminated: labeled.NewCounter("terminated", "xyz", scope.NewSubScope("interruptible2")),
InterruptedThresholdHit: labeled.NewCounter("thresholdHit", "xyz", scope.NewSubScope("interruptible3")),
},
}

tests := []struct {
name string
ignoreRetryCause bool
attempts uint32
systemFailures uint32
expectedInterruptible bool
// interruptibleFailureThreshold is hardcoded to 2
name string
ignoreRetryCause bool
attempts uint32
systemFailures uint32
maxAttempts int32
interruptibleFailureThreshold int32
expectedInterruptible bool
}{
{"Interruptible", false, 0, 0, true},
{"NonInterruptible", false, 0, 2, false},
//{"IgnoreCauseInterruptible", true, 0, 0, true}, // TODO
//{"IgnoreCauseNonInterruptibleSystem", true, 0, 2, false}, // TODO
//{"IgnoreCauseNonInterruptibleUser", true, 1, 0, false}, // TODO
{"Interruptible", false, 0, 0, 2, 1, true},
{"NonInterruptible", false, 0, 1, 2, 1, false},
{"InterruptibleNegativeThreshold", false, 0, 0, 2, -1, true},
{"NonInterruptibleNegativeThreshold", false, 0, 1, 2, -1, false},
{"IgnoreCauseInterruptible", true, 0, 0, 2, 1, true},
{"IgnoreCauseNonInterruptibleSystem", true, 0, 1, 2, 1, false},
{"IgnoreCauseNonInterruptibleUser", true, 1, 0, 2, 1, false},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config.GetConfig().NodeConfig.IgnoreRetryCause = tt.ignoreRetryCause
scope := promutils.NewTestScope()

// mock all inputs
config.GetConfig().NodeConfig.DefaultMaxAttempts = tt.maxAttempts
config.GetConfig().NodeConfig.IgnoreRetryCause = tt.ignoreRetryCause

dataStore, _ := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, scope.NewSubScope("dataStore"))
nodeExecutor := nodeExecutor{
interruptibleFailureThreshold: tt.interruptibleFailureThreshold,
maxDatasetSizeBytes: 0,
defaultDataSandbox: "s3://bucket-a",
store: dataStore,
shardSelector: ioutils.NewConstantShardSelector([]string{"x"}),
enqueueWorkflow: func(workflowID v1alpha1.WorkflowID) {},
metrics: &nodeMetrics{
InterruptibleNodesRunning: labeled.NewCounter("running", "xyz", scope.NewSubScope("interruptible1")),
InterruptibleNodesTerminated: labeled.NewCounter("terminated", "xyz", scope.NewSubScope("interruptible2")),
InterruptedThresholdHit: labeled.NewCounter("thresholdHit", "xyz", scope.NewSubScope("interruptible3")),
},
}

w := getTestFlyteWorkflow()

nodeLookup := &mocks2.NodeLookup{}
Expand Down