diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index 3f0c4f758cd..dbf0e69a4b0 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -99,7 +99,6 @@ var ( DefaultMaxAttempts: 1, IgnoreRetryCause: false, EnableCRDebugMetadata: false, - ClearPreviousError: false, }, MaxStreakLength: 8, // Turbo mode is enabled by default ProfilerPort: config.Port{ @@ -215,8 +214,7 @@ type NodeConfig struct { InterruptibleFailureThreshold int32 `json:"interruptible-failure-threshold" pflag:"1,number of failures for a node to be still considered interruptible. Negative numbers are treated as complementary (ex. -1 means last attempt is non-interruptible).'"` DefaultMaxAttempts int32 `json:"default-max-attempts" pflag:"3,Default maximum number of attempts for a node"` IgnoreRetryCause bool `json:"ignore-retry-cause" pflag:",Ignore retry cause and count all attempts toward a node's max attempts"` - EnableCRDebugMetadata bool `json:"enable-cr-debug-metadata" pflag:",Collapse node on any terminal state, not just successful terminations. This is useful to reduce the size of workflow state in etcd."` - ClearPreviousError bool `json:"clear-previous-error" pflag:",When using the fail after executable nodes complete mode many nodes in the workflow can fail. This option causes the previous error to be cleared every time a new error occurs. This helps reduce the size of workflow state in etcd."` + EnableCRDebugMetadata bool `json:"enable-cr-debug-metadata" pflag:",By default node state gets cleared after flytepropeller will no longer need it. This is useful to reduce the size of workflow state in etcd. Consider enabling this to keep this state for debugging purposes."` } // DefaultDeadlines contains default values for timeouts diff --git a/flytepropeller/pkg/controller/config/config_flags.go b/flytepropeller/pkg/controller/config/config_flags.go index 353fdca2945..321754a6b8d 100755 --- a/flytepropeller/pkg/controller/config/config_flags.go +++ b/flytepropeller/pkg/controller/config/config_flags.go @@ -96,8 +96,7 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.Int32(fmt.Sprintf("%v%v", prefix, "node-config.interruptible-failure-threshold"), defaultConfig.NodeConfig.InterruptibleFailureThreshold, "number of failures for a node to be still considered interruptible. Negative numbers are treated as complementary (ex. -1 means last attempt is non-interruptible).'") cmdFlags.Int32(fmt.Sprintf("%v%v", prefix, "node-config.default-max-attempts"), defaultConfig.NodeConfig.DefaultMaxAttempts, "Default maximum number of attempts for a node") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "node-config.ignore-retry-cause"), defaultConfig.NodeConfig.IgnoreRetryCause, "Ignore retry cause and count all attempts toward a node's max attempts") - cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "node-config.enable-cr-debug-metadata"), defaultConfig.NodeConfig.EnableCRDebugMetadata, "Collapse node on any terminal state, not just successful terminations. This is useful to reduce the size of workflow state in etcd.") - cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "node-config.clear-previous-error"), defaultConfig.NodeConfig.ClearPreviousError, "When using the fail after executable nodes complete mode many nodes in the workflow can fail. This option causes the previous error to be cleared every time a new error occurs. This helps reduce the size of workflow state in etcd.") + cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "node-config.enable-cr-debug-metadata"), defaultConfig.NodeConfig.EnableCRDebugMetadata, "By default node state gets cleared after flytepropeller will no longer need it. This is useful to reduce the size of workflow state in etcd. Consider enabling this to keep this state for debugging purposes.") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "max-streak-length"), defaultConfig.MaxStreakLength, "Maximum number of consecutive rounds that one propeller worker can use for one workflow - >1 => turbo-mode is enabled.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "event-config.raw-output-policy"), defaultConfig.EventConfig.RawOutputPolicy, "How output data should be passed along in execution events.") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "event-config.fallback-to-output-reference"), defaultConfig.EventConfig.FallbackToOutputReference, "Whether output data should be sent by reference when it is too large to be sent inline in execution events.") diff --git a/flytepropeller/pkg/controller/config/config_flags_test.go b/flytepropeller/pkg/controller/config/config_flags_test.go index a27de1d86b6..6f3c67b6525 100755 --- a/flytepropeller/pkg/controller/config/config_flags_test.go +++ b/flytepropeller/pkg/controller/config/config_flags_test.go @@ -757,20 +757,6 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_node-config.clear-previous-error", func(t *testing.T) { - - t.Run("Override", func(t *testing.T) { - testValue := "1" - - cmdFlags.Set("node-config.clear-previous-error", testValue) - if vBool, err := cmdFlags.GetBool("node-config.clear-previous-error"); err == nil { - testDecodeJson_Config(t, fmt.Sprintf("%v", vBool), &actual.NodeConfig.ClearPreviousError) - - } else { - assert.FailNow(t, err.Error()) - } - }) - }) t.Run("Test_max-streak-length", func(t *testing.T) { t.Run("Override", func(t *testing.T) { diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index a9072ffbf99..442e1134473 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -481,7 +481,6 @@ type nodeExecutor struct { catalog catalog.Client clusterID string enableCRDebugMetadata bool - clearPreviousError bool defaultActiveDeadline time.Duration defaultDataSandbox storage.DataReference defaultExecutionDeadline time.Duration @@ -870,7 +869,7 @@ func (c *nodeExecutor) execute(ctx context.Context, h interfaces.NodeHandler, nC } func (c *nodeExecutor) Clear(executableNodeStatus v1alpha1.ExecutableNodeStatus) { - if c.clearPreviousError { + if !c.enableCRDebugMetadata { executableNodeStatus.ClearExecutionError() } } @@ -1449,7 +1448,6 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora catalog: catalogClient, clusterID: clusterID, enableCRDebugMetadata: nodeConfig.EnableCRDebugMetadata, - clearPreviousError: nodeConfig.ClearPreviousError, defaultActiveDeadline: nodeConfig.DefaultDeadlines.DefaultNodeActiveDeadline.Duration, defaultDataSandbox: defaultRawOutputPrefix, defaultExecutionDeadline: nodeConfig.DefaultDeadlines.DefaultNodeExecutionDeadline.Duration, diff --git a/flytepropeller/pkg/controller/workflow/executor_test.go b/flytepropeller/pkg/controller/workflow/executor_test.go index 16285899856..93f50246683 100644 --- a/flytepropeller/pkg/controller/workflow/executor_test.go +++ b/flytepropeller/pkg/controller/workflow/executor_test.go @@ -540,15 +540,15 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) { tests := []struct { name string onFailurePolicy v1alpha1.WorkflowOnFailurePolicy - clearPreviousError bool + enableCRDebugMetadata bool expectedRoundsToFail int expectedNodesWithErrorsCount int expectedFailedNodesCount int }{ {"failImidiately", v1alpha1.WorkflowOnFailurePolicy(core.WorkflowMetadata_FAIL_IMMEDIATELY), false, 6, 1, 1}, - {"failImidiately clearPreviousError", v1alpha1.WorkflowOnFailurePolicy(core.WorkflowMetadata_FAIL_IMMEDIATELY), true, 6, 1, 1}, - {"failAfterExecutableNodesComplete", v1alpha1.WorkflowOnFailurePolicy(core.WorkflowMetadata_FAIL_AFTER_EXECUTABLE_NODES_COMPLETE), false, 12, 2, 2}, - {"failAfterExecutableNodesComplete clearPreviousError", v1alpha1.WorkflowOnFailurePolicy(core.WorkflowMetadata_FAIL_AFTER_EXECUTABLE_NODES_COMPLETE), true, 12, 1, 2}, + {"failImidiately enableCRDebugMetadata", v1alpha1.WorkflowOnFailurePolicy(core.WorkflowMetadata_FAIL_IMMEDIATELY), true, 6, 1, 1}, + {"failAfterExecutableNodesComplete", v1alpha1.WorkflowOnFailurePolicy(core.WorkflowMetadata_FAIL_AFTER_EXECUTABLE_NODES_COMPLETE), false, 12, 1, 2}, + {"failAfterExecutableNodesComplete enableCRDebugMetadata", v1alpha1.WorkflowOnFailurePolicy(core.WorkflowMetadata_FAIL_AFTER_EXECUTABLE_NODES_COMPLETE), true, 12, 2, 2}, } wJSON, err := yamlutils.ReadYamlFileAsJSON("testdata/benchmark_wf.yaml") @@ -557,7 +557,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) { t.Run(test.name, func(t *testing.T) { nodeConfig := config.GetConfig().NodeConfig - nodeConfig.ClearPreviousError = test.clearPreviousError + nodeConfig.EnableCRDebugMetadata = test.enableCRDebugMetadata nodeExec, err := nodes.NewExecutor(ctx, nodeConfig, store, enqueueWorkflow, eventSink, adminClient, adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, promutils.NewTestScope()) assert.NoError(t, err)