From 7dbe42984952b7d21d52853510aaf48dc34833f9 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 15 Nov 2023 21:42:59 +0000 Subject: [PATCH] Tidy --- .../pkg/apis/flyteworkflow/v1alpha1/iface.go | 1 + .../v1alpha1/mocks/ExecutableNodeStatus.go | 34 +++++++++++++++++++ .../flyteworkflow/v1alpha1/node_status.go | 9 +++-- .../pkg/controller/nodes/executor.go | 8 +---- .../pkg/controller/nodes/executor_test.go | 5 +++ 5 files changed, 45 insertions(+), 12 deletions(-) diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go index b3d744bd77..ae8fa505c8 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -369,6 +369,7 @@ type ExecutableNodeStatus interface { GetOutputDir() DataReference GetMessage() string GetExecutionError() *core.ExecutionError + PopExecutionError() *core.ExecutionError GetAttempts() uint32 GetSystemFailures() uint32 GetWorkflowNodeStatus() ExecutableWorkflowNodeStatus diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go index cb447e06fc..952e05103e 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go @@ -1157,6 +1157,40 @@ func (_m *ExecutableNodeStatus) IsDirty() bool { return r0 } +type ExecutableNodeStatus_PopExecutionError struct { + *mock.Call +} + +func (_m ExecutableNodeStatus_PopExecutionError) Return(_a0 *core.ExecutionError) *ExecutableNodeStatus_PopExecutionError { + return &ExecutableNodeStatus_PopExecutionError{Call: _m.Call.Return(_a0)} +} + +func (_m *ExecutableNodeStatus) OnPopExecutionError() *ExecutableNodeStatus_PopExecutionError { + c_call := _m.On("PopExecutionError") + return &ExecutableNodeStatus_PopExecutionError{Call: c_call} +} + +func (_m *ExecutableNodeStatus) OnPopExecutionErrorMatch(matchers ...interface{}) *ExecutableNodeStatus_PopExecutionError { + c_call := _m.On("PopExecutionError", matchers...) + return &ExecutableNodeStatus_PopExecutionError{Call: c_call} +} + +// PopExecutionError provides a mock function with given fields: +func (_m *ExecutableNodeStatus) PopExecutionError() *core.ExecutionError { + ret := _m.Called() + + var r0 *core.ExecutionError + if rf, ok := ret.Get(0).(func() *core.ExecutionError); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*core.ExecutionError) + } + } + + return r0 +} + // ResetDirty provides a mock function with given fields: func (_m *ExecutableNodeStatus) ResetDirty() { _m.Called() diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go index afbf651981..ffb95e9e80 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -477,10 +477,10 @@ func (in *NodeStatus) ClearArrayNodeStatus() { in.SetDirty() } -func (in *NodeStatus) ClearErrorMessage() { - if in.Error != nil { - in.Error.ClearMessage() - } +func (in *NodeStatus) PopExecutionError() *core.ExecutionError { + executionError := in.GetExecutionError() + in.Error = nil + return executionError } func (in *NodeStatus) GetLastUpdatedAt() *metav1.Time { @@ -640,7 +640,6 @@ func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason st // Clear most fields after reaching a terminal state. This keeps the CRD state small and avoid etcd size // limits. We keep phase and StoppedAt. StoppedAt is used to calculate transition latency between this // node and any downstream nodes and Phase is required for propeller to continue to downstream nodes. - in.ClearErrorMessage() in.QueuedAt = nil in.StartedAt = nil in.LastUpdatedAt = nil diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index 087bf0b741..199cae274a 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -235,13 +235,7 @@ func (c *recursiveNodeExecutor) RecursiveNodeHandler(ctx context.Context, execCo if err != nil { return interfaces.NodeStatusUndefined, err } - nodeError := *nodeStatus.GetExecutionError() - fmt.Println("before modifying", nodeError) - status := interfaces.NodeStatusFailed(&nodeError) - nodeStatus.ClearErrorMessage() - fmt.Println("modified error", nodeStatus.GetExecutionError()) - fmt.Println("after modifying", nodeError) - return status, nil + return interfaces.NodeStatusFailed(nodeStatus.PopExecutionError()), nil } else if nodePhase == v1alpha1.NodePhaseTimedOut { logger.Debugf(currentNodeCtx, "Node has timed out, traversing downstream.") _, err := c.handleDownstream(ctx, execContext, dag, nl, currentNode) diff --git a/flytepropeller/pkg/controller/nodes/executor_test.go b/flytepropeller/pkg/controller/nodes/executor_test.go index 222e0a05d5..316b66e01d 100644 --- a/flytepropeller/pkg/controller/nodes/executor_test.go +++ b/flytepropeller/pkg/controller/nodes/executor_test.go @@ -619,6 +619,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { mockN0Status.OnGetPhase().Return(n0Phase) mockN0Status.OnGetAttempts().Return(uint32(0)) mockN0Status.OnGetExecutionError().Return(nil) + mockN0Status.OnPopExecutionError().Return(&core.ExecutionError{Code: "code", Message: "message"}) mockN0Status.OnIsDirty().Return(false) mockN0Status.OnGetParentTaskID().Return(nil) @@ -736,6 +737,10 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { return handler.UnknownTransition, fmt.Errorf("error") }, false, false, false, core.NodeExecution_FAILED}, + {"failing->failed", v1alpha1.NodePhaseFailed, v1alpha1.NodePhaseFailed, interfaces.NodePhaseFailed, func() (handler.Transition, error) { + return handler.UnknownTransition, fmt.Errorf("error") + }, false, false, false, core.NodeExecution_FAILED}, + {"failing->failed(error)", v1alpha1.NodePhaseFailing, v1alpha1.NodePhaseFailing, interfaces.NodePhaseUndefined, func() (handler.Transition, error) { return handler.UnknownTransition, fmt.Errorf("error") }, true, true, false, core.NodeExecution_FAILING},