Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to clear past errors from workflow state #6

Draft
wants to merge 29 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
fea71e9
Don't track node errors
Tom-Newton Nov 15, 2023
4d1e5f6
Wipe node error after its collected
Tom-Newton Nov 15, 2023
57e12aa
Revert "Don't track node errors"
Tom-Newton Nov 15, 2023
457f0d7
Try clearing error message without breaking upstream propagation of e…
Tom-Newton Nov 15, 2023
552f413
Fix clearing error message
Tom-Newton Nov 15, 2023
83812e3
Create a copy of the error to return
Tom-Newton Nov 15, 2023
26d9d05
Working error propagates to imdiate execution
Tom-Newton Nov 15, 2023
d50598c
Tidy
Tom-Newton Nov 15, 2023
5ee89e1
Copy state when not failing immediately
Tom-Newton Nov 15, 2023
cb768a3
Reset errors only when recording a new error
Tom-Newton Nov 16, 2023
3a17f9e
Tidy
Tom-Newton Nov 16, 2023
3574d6f
White space
Tom-Newton Nov 16, 2023
63f83f2
Run each round in a sub test to make the output more interpretable
Tom-Newton Dec 19, 2023
4487db4
Make the test follow FAIL_AFTER_EXECUTABLE_NODES_COMPLETE code path
Tom-Newton Dec 19, 2023
96c5393
Small progress on the test
Tom-Newton Dec 20, 2023
2971250
Working test for error clearing
Tom-Newton Dec 21, 2023
c0eb0b1
Both test cases working
Tom-Newton Dec 21, 2023
8648de0
Tidy test
Tom-Newton Dec 21, 2023
b247ec8
Add test cases for clearPreviousError config
Tom-Newton Dec 21, 2023
41a06b1
Implement clearPreviousError config
Tom-Newton Dec 21, 2023
250a0ff
Update generated code
Tom-Newton Dec 21, 2023
345f990
Require error to be removed not just cleared
Tom-Newton Dec 21, 2023
9934791
Working remove error
Tom-Newton Dec 21, 2023
431f1d3
Remove redundant attempts
Tom-Newton Dec 21, 2023
0ce2f89
Update generated code
Tom-Newton Dec 21, 2023
99fc625
Tidy mocking
Tom-Newton Dec 21, 2023
216e1a3
Avoid making nodeExecContext public
Tom-Newton Dec 21, 2023
6c5650c
Remove `clear-previous-error` and use `enable-cr-debug-metadata` instead
Tom-Newton Jan 16, 2024
2cea075
Update test comments
Tom-Newton Jan 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ type MutableNodeStatus interface {
GetArrayNodeStatus() MutableArrayNodeStatus
GetOrCreateArrayNodeStatus() MutableArrayNodeStatus
ClearArrayNodeStatus()

ClearExecutionError()
}

type ExecutionTimeInfo interface {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,10 @@ func (in *NodeStatus) ClearArrayNodeStatus() {
in.SetDirty()
}

func (in *NodeStatus) ClearExecutionError() {
in.Error = nil
}

func (in *NodeStatus) GetLastUpdatedAt() *metav1.Time {
return in.LastUpdatedAt
}
Expand Down Expand Up @@ -632,6 +636,7 @@ func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason st
if p == NodePhaseSucceeded || p == NodePhaseSkipped || !enableCRDebugMetadata {
// Clear most status related fields after reaching a terminal state. This keeps the CR state small to avoid
// etcd size limits. Importantly we keep Phase, StoppedAt and Error which will be needed further.
// Errors will still be needed but it will be cleaned up when possible because they can be very large.
in.Message = ""
in.QueuedAt = nil
in.StartedAt = nil
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ type NodeConfig struct {
InterruptibleFailureThreshold int32 `json:"interruptible-failure-threshold" pflag:"1,number of failures for a node to be still considered interruptible. Negative numbers are treated as complementary (ex. -1 means last attempt is non-interruptible).'"`
DefaultMaxAttempts int32 `json:"default-max-attempts" pflag:"3,Default maximum number of attempts for a node"`
IgnoreRetryCause bool `json:"ignore-retry-cause" pflag:",Ignore retry cause and count all attempts toward a node's max attempts"`
EnableCRDebugMetadata bool `json:"enable-cr-debug-metadata" pflag:",Collapse node on any terminal state, not just successful terminations. This is useful to reduce the size of workflow state in etcd."`
EnableCRDebugMetadata bool `json:"enable-cr-debug-metadata" pflag:",By default node state gets cleared after flytepropeller will no longer need it. This is useful to reduce the size of workflow state in etcd. Consider enabling this to keep this state for debugging purposes."`
}

// DefaultDeadlines contains default values for timeouts
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ func (c *recursiveNodeExecutor) handleDownstream(ctx context.Context, execContex
partialNodeCompletion := false
onFailurePolicy := execContext.GetOnFailurePolicy()
stateOnComplete := interfaces.NodeStatusComplete
var executableNodeStatusOnComplete v1alpha1.ExecutableNodeStatus
for _, downstreamNodeName := range downstreamNodes {
downstreamNode, ok := nl.GetNode(downstreamNodeName)
if !ok {
Expand All @@ -298,6 +299,10 @@ func (c *recursiveNodeExecutor) handleDownstream(ctx context.Context, execContex
// If the failure policy allows other nodes to continue running, do not exit the loop,
// Keep track of the last failed state in the loop since it'll be the one to return.
// TODO: If multiple nodes fail (which this mode allows), consolidate/summarize failure states in one.
if executableNodeStatusOnComplete != nil {
c.nodeExecutor.Clear(executableNodeStatusOnComplete)
}
executableNodeStatusOnComplete = nl.GetNodeExecutionStatus(ctx, downstreamNode.GetID())
stateOnComplete = state
} else {
return state, nil
Expand Down Expand Up @@ -863,6 +868,12 @@ func (c *nodeExecutor) execute(ctx context.Context, h interfaces.NodeHandler, nC
return phase, nil
}

func (c *nodeExecutor) Clear(executableNodeStatus v1alpha1.ExecutableNodeStatus) {
if !c.enableCRDebugMetadata {
executableNodeStatus.ClearExecutionError()
}
}

func (c *nodeExecutor) Abort(ctx context.Context, h interfaces.NodeHandler, nCtx interfaces.NodeExecutionContext, reason string, finalTransition bool) error {
logger.Debugf(ctx, "Calling aborting & finalize")
if err := h.Abort(ctx, nCtx, reason); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/pkg/controller/nodes/interfaces/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/handler"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)

//go:generate mockery -all -case=underscore
Expand All @@ -16,6 +17,7 @@ type NodeExecutor interface {
HandleNode(ctx context.Context, dag executors.DAGStructure, nCtx NodeExecutionContext, h NodeHandler) (NodeStatus, error)
Abort(ctx context.Context, h NodeHandler, nCtx NodeExecutionContext, reason string, finalTransition bool) error
Finalize(ctx context.Context, h NodeHandler, nCtx NodeExecutionContext) error
Clear(executableNodeStatus v1alpha1.ExecutableNodeStatus)
}

// Interface that should be implemented for a node type.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

128 changes: 91 additions & 37 deletions flytepropeller/pkg/controller/workflow/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ type fakeRemoteWritePlugin struct {
t assert.TestingT
}

type fakeNodeExecContext interface {
Node() v1alpha1.ExecutableNode
}

func (f fakeRemoteWritePlugin) Handle(ctx context.Context, tCtx pluginCore.TaskExecutionContext) (pluginCore.Transition, error) {
logger.Infof(ctx, "----------------------------------------------------------------------------------------------")
logger.Infof(ctx, "Handle called for %s", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName())
Expand Down Expand Up @@ -224,6 +228,26 @@ func createTaskExecutorErrorInCheck(t assert.TestingT) pluginCore.PluginEntry {
}
}

func CountFailedNodes(nodeStatuses map[v1alpha1.NodeID]*v1alpha1.NodeStatus) int {
count := 0
for _, v := range nodeStatuses {
if v.Phase == v1alpha1.NodePhaseFailed {
count++
}
}
return count
}

func CountNodesWithErrors(nodeStatuses map[v1alpha1.NodeID]*v1alpha1.NodeStatus) int {
count := 0
for _, v := range nodeStatuses {
if v.Error != nil {
count++
}
}
return count
}

func TestWorkflowExecutor_HandleFlyteWorkflow_Error(t *testing.T) {
ctx := context.Background()
scope := testScope.NewSubScope("12")
Expand Down Expand Up @@ -496,54 +520,84 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) {

h := &nodemocks.NodeHandler{}
h.OnAbortMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil)
h.OnHandleMatch(mock.Anything, mock.Anything).Return(handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(nil)), nil)

// Mock handler marks start-node successfully completed but other nodes as failed
startNodeMatcher := mock.MatchedBy(func(nodeExecContext fakeNodeExecContext) bool {
return nodeExecContext.Node().IsStartNode()
})
h.OnHandleMatch(mock.Anything, startNodeMatcher).Return(handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(nil)), nil)
h.OnHandleMatch(mock.Anything, mock.Anything).Return(handler.DoTransition(
handler.TransitionTypeEphemeral,
handler.PhaseInfoFailureErr(&core.ExecutionError{Code: "code", Message: "message", ErrorUri: "uri"}, nil)), nil,
)

h.OnFinalizeMatch(mock.Anything, mock.Anything).Return(nil)
h.OnFinalizeRequired().Return(false)

handlerFactory := &nodemocks.HandlerFactory{}
handlerFactory.OnSetupMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil)
handlerFactory.OnGetHandlerMatch(mock.Anything).Return(h, nil)

nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, adminClient,
maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, promutils.NewTestScope())
assert.NoError(t, err)
executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope())
assert.NoError(t, err)

assert.NoError(t, executor.Initialize(ctx))
tests := []struct {
name string
onFailurePolicy v1alpha1.WorkflowOnFailurePolicy
enableCRDebugMetadata bool
expectedRoundsToFail int
expectedNodesWithErrorsCount int
expectedFailedNodesCount int
}{
{"failImidiately", v1alpha1.WorkflowOnFailurePolicy(core.WorkflowMetadata_FAIL_IMMEDIATELY), false, 6, 1, 1},
{"failImidiately enableCRDebugMetadata", v1alpha1.WorkflowOnFailurePolicy(core.WorkflowMetadata_FAIL_IMMEDIATELY), true, 6, 1, 1},
{"failAfterExecutableNodesComplete", v1alpha1.WorkflowOnFailurePolicy(core.WorkflowMetadata_FAIL_AFTER_EXECUTABLE_NODES_COMPLETE), false, 12, 1, 2},
{"failAfterExecutableNodesComplete enableCRDebugMetadata", v1alpha1.WorkflowOnFailurePolicy(core.WorkflowMetadata_FAIL_AFTER_EXECUTABLE_NODES_COMPLETE), true, 12, 2, 2},
}

wJSON, err := yamlutils.ReadYamlFileAsJSON("testdata/benchmark_wf.yaml")
if assert.NoError(t, err) {
w := &v1alpha1.FlyteWorkflow{
RawOutputDataConfig: v1alpha1.RawOutputDataConfig{RawOutputDataConfig: &admin.RawOutputDataConfig{}},
}
if assert.NoError(t, json.Unmarshal(wJSON, w)) {
// For benchmark workflow, we will run into the first failure on round 6

roundsToFail := 8
for i := 0; i < roundsToFail; i++ {
err := executor.HandleFlyteWorkflow(ctx, w)
assert.Nil(t, err, "Round [%v]", i)
fmt.Printf("Round[%d] Workflow[%v]\n", i, w.Status.Phase.String())
walkAndPrint(w.Connections, w.Status.NodeStatus)
for _, v := range w.Status.NodeStatus {
// Reset dirty manually for tests.
v.ResetDirty()
}
fmt.Printf("\n")

if i == roundsToFail-1 {
assert.Equal(t, v1alpha1.WorkflowPhaseFailed, w.Status.Phase)
} else if i == roundsToFail-2 {
assert.Equal(t, v1alpha1.WorkflowPhaseHandlingFailureNode, w.Status.Phase)
} else {
assert.NotEqual(t, v1alpha1.WorkflowPhaseFailed, w.Status.Phase, "For Round [%v] got phase [%v]", i, w.Status.Phase.String())
assert.NoError(t, err)
for _, test := range tests {

t.Run(test.name, func(t *testing.T) {
nodeConfig := config.GetConfig().NodeConfig
nodeConfig.EnableCRDebugMetadata = test.enableCRDebugMetadata
nodeExec, err := nodes.NewExecutor(ctx, nodeConfig, store, enqueueWorkflow, eventSink, adminClient, adminClient,
maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, promutils.NewTestScope())
assert.NoError(t, err)
executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope())
assert.NoError(t, err)
assert.NoError(t, executor.Initialize(ctx))

w := &v1alpha1.FlyteWorkflow{
RawOutputDataConfig: v1alpha1.RawOutputDataConfig{RawOutputDataConfig: &admin.RawOutputDataConfig{}},
WorkflowSpec: &v1alpha1.WorkflowSpec{OnFailurePolicy: test.onFailurePolicy},
}
if assert.NoError(t, json.Unmarshal(wJSON, w)) {

for i := 0; i < test.expectedRoundsToFail; i++ {
t.Run(fmt.Sprintf("Round[%d]", i), func(t *testing.T) {
err := executor.HandleFlyteWorkflow(ctx, w)
assert.Nil(t, err, "Round [%v]", i)
fmt.Printf("Round[%d] Workflow[%v]\n", i, w.Status.Phase.String())
walkAndPrint(w.Connections, w.Status.NodeStatus)
for _, v := range w.Status.NodeStatus {
// Reset dirty manually for tests.
v.ResetDirty()
}
fmt.Printf("\n")

if i == test.expectedRoundsToFail-1 {
assert.Equal(t, v1alpha1.WorkflowPhaseFailed, w.Status.Phase)
} else if i == test.expectedRoundsToFail-2 {
assert.Equal(t, v1alpha1.WorkflowPhaseHandlingFailureNode, w.Status.Phase)
} else {
assert.NotEqual(t, v1alpha1.WorkflowPhaseFailed, w.Status.Phase, "For Round [%v] got phase [%v]", i, w.Status.Phase.String())
}
})
}

assert.Equal(t, test.expectedFailedNodesCount, CountFailedNodes(w.Status.NodeStatus))
assert.Equal(t, test.expectedNodesWithErrorsCount, CountNodesWithErrors(w.Status.NodeStatus))
assert.Equal(t, v1alpha1.WorkflowPhaseFailed.String(), w.Status.Phase.String(), "Message: [%v]", w.Status.Message)
}

assert.Equal(t, v1alpha1.WorkflowPhaseFailed.String(), w.Status.Phase.String(), "Message: [%v]", w.Status.Message)
}
})
}
assert.True(t, recordedRunning)
assert.True(t, recordedFailing)
Expand Down
Loading