From b4c58be03d310b6e2acc396ce41651f8c259cc95 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Mon, 13 Nov 2023 21:08:29 +0000 Subject: [PATCH 01/19] `ClearSubNodeStatus` on failure Signed-off-by: Thomas Newton --- flytepropeller/pkg/controller/nodes/executor.go | 1 + 1 file changed, 1 insertion(+) diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index 23062a8cb3..3c443a5f4b 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -1324,6 +1324,7 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur if err := c.Abort(ctx, h, nCtx, "node failing", false); err != nil { return interfaces.NodeStatusUndefined, err } + nodeStatus.ClearSubNodeStatus() nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, metav1.Now(), nodeStatus.GetMessage(), nodeStatus.GetExecutionError()) c.metrics.FailureDuration.Observe(ctx, nodeStatus.GetStartedAt().Time, nodeStatus.GetStoppedAt().Time) if nCtx.NodeExecutionMetadata().IsInterruptible() { From a4569bef84723a4ec2f2a3c30e592d9f5be58b26 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Tue, 14 Nov 2023 16:46:27 +0000 Subject: [PATCH 02/19] More aggressive collapsing Signed-off-by: Thomas Newton --- .../flyteworkflow/v1alpha1/node_status.go | 25 ++++++++----------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go index 9fb891d01a..188487a614 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -607,6 +607,7 @@ func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason st } n := occurredAt + in.LastUpdatedAt = &n if occurredAt.IsZero() { n = metav1.Now() } @@ -630,30 +631,24 @@ func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason st if in.StoppedAt == nil { in.StoppedAt = &n } - if in.StartedAt == nil { - in.StartedAt = &n - } - if in.LastAttemptStartedAt == nil { - in.LastAttemptStartedAt = &n - } - } - in.LastUpdatedAt = &n - - // For cases in which the node is either Succeeded or Skipped we clear most fields from the status - // except for StoppedAt and Phase. StoppedAt is used to calculate transition latency between this node and - // any downstream nodes and Phase is required for propeller to continue to downstream nodes. - if p == NodePhaseSucceeded || p == NodePhaseSkipped { - in.Message = "" + // Clear most fields after reaching a terminal state. This keeps the CRD state small and avoid etcd size + // limits. We keep phase and StoppedAt. StoppedAt is used to calculate transition latency between this + // node and any downstream nodes and Phase is required for propeller to continue to downstream nodes. in.QueuedAt = nil in.StartedAt = nil + in.LastUpdatedAt = nil in.LastAttemptStartedAt = nil in.DynamicNodeStatus = nil in.BranchStatus = nil in.SubNodeStatus = nil in.TaskNodeStatus = nil in.WorkflowNodeStatus = nil - in.LastUpdatedAt = nil } + // For cases in which the node is either Succeeded or Skipped we clear the message. Potentially this will be useful + // for other failed states. + in.Message = "" + // if p == NodePhaseSucceeded || p == NodePhaseSkipped { + // } in.SetDirty() } From eccf702a78b24de56c62f7456d5cb83366724c45 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Tue, 14 Nov 2023 17:02:02 +0000 Subject: [PATCH 03/19] Tidy Signed-off-by: Thomas Newton --- flytepropeller/pkg/controller/nodes/executor.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index 3c443a5f4b..e4512eb2ce 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -1324,7 +1324,6 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur if err := c.Abort(ctx, h, nCtx, "node failing", false); err != nil { return interfaces.NodeStatusUndefined, err } - nodeStatus.ClearSubNodeStatus() nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, metav1.Now(), nodeStatus.GetMessage(), nodeStatus.GetExecutionError()) c.metrics.FailureDuration.Observe(ctx, nodeStatus.GetStartedAt().Time, nodeStatus.GetStoppedAt().Time) if nCtx.NodeExecutionMetadata().IsInterruptible() { @@ -1339,7 +1338,6 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur return interfaces.NodeStatusUndefined, err } - nodeStatus.ClearSubNodeStatus() nodeStatus.UpdatePhase(v1alpha1.NodePhaseTimedOut, metav1.Now(), nodeStatus.GetMessage(), nodeStatus.GetExecutionError()) c.metrics.TimedOutFailure.Inc(ctx) if nCtx.NodeExecutionMetadata().IsInterruptible() { @@ -1364,7 +1362,6 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur stopped = &t } c.metrics.SuccessDuration.Observe(ctx, started.Time, stopped.Time) - nodeStatus.ClearSubNodeStatus() nodeStatus.UpdatePhase(v1alpha1.NodePhaseSucceeded, t, "completed successfully", nil) if nCtx.NodeExecutionMetadata().IsInterruptible() { c.metrics.InterruptibleNodesTerminated.Inc(ctx) From dd6cd7b72e383a725ce8d49633fa3bcddf278440 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Tue, 14 Nov 2023 23:11:42 +0000 Subject: [PATCH 04/19] Fix panic Signed-off-by: Thomas Newton --- flytepropeller/pkg/controller/nodes/executor.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index e4512eb2ce..b596eb08c9 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -1324,8 +1324,9 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur if err := c.Abort(ctx, h, nCtx, "node failing", false); err != nil { return interfaces.NodeStatusUndefined, err } + startedAt := nodeStatus.GetStartedAt().Time nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, metav1.Now(), nodeStatus.GetMessage(), nodeStatus.GetExecutionError()) - c.metrics.FailureDuration.Observe(ctx, nodeStatus.GetStartedAt().Time, nodeStatus.GetStoppedAt().Time) + c.metrics.FailureDuration.Observe(ctx, startedAt, nodeStatus.GetStoppedAt().Time) if nCtx.NodeExecutionMetadata().IsInterruptible() { c.metrics.InterruptibleNodesTerminated.Inc(ctx) } From 8e0f748ce913f4f7131fdfce989b812c36342da4 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Thu, 16 Nov 2023 13:39:22 +0000 Subject: [PATCH 05/19] Tidy Signed-off-by: Thomas Newton --- .../pkg/apis/flyteworkflow/v1alpha1/node_status.go | 13 +++---------- flytepropeller/pkg/controller/nodes/executor.go | 2 +- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go index 188487a614..ffdff4f6e7 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -626,14 +626,12 @@ func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason st in.LastAttemptStartedAt = &n } } else if IsPhaseTerminal(p) { - // If we are in terminal phase then we will clear out all our fields as they are not required anymore - // Only thing required is stopped at and lastupdatedat time if in.StoppedAt == nil { in.StoppedAt = &n } - // Clear most fields after reaching a terminal state. This keeps the CRD state small and avoid etcd size - // limits. We keep phase and StoppedAt. StoppedAt is used to calculate transition latency between this - // node and any downstream nodes and Phase is required for propeller to continue to downstream nodes. + // Clear most status related fields after reaching a terminal state. This keeps the CRD state small to avoid + // etcd size limits. Importantly we keep Phase, StoppedAt and Error which will be needed further. + in.Message = "" in.QueuedAt = nil in.StartedAt = nil in.LastUpdatedAt = nil @@ -644,11 +642,6 @@ func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason st in.TaskNodeStatus = nil in.WorkflowNodeStatus = nil } - // For cases in which the node is either Succeeded or Skipped we clear the message. Potentially this will be useful - // for other failed states. - in.Message = "" - // if p == NodePhaseSucceeded || p == NodePhaseSkipped { - // } in.SetDirty() } diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index b596eb08c9..3af658fb40 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -1278,7 +1278,7 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx inter func (c *nodeExecutor) handleRetryableFailure(ctx context.Context, nCtx interfaces.NodeExecutionContext, h interfaces.NodeHandler) (interfaces.NodeStatus, error) { nodeStatus := nCtx.NodeStatus() logger.Debugf(ctx, "node failed with retryable failure, aborting and finalizing, message: %s", nodeStatus.GetMessage()) - if err := c.Abort(ctx, h, nCtx, nodeStatus.GetMessage(), false); err != nil { + if err := c.Abort(ctx, h, nCtx, nodeStatus.GetExecutionError().GetMessage(), false); err != nil { return interfaces.NodeStatusUndefined, err } From 2fc33e72d2c399d3e14a428e6dc01e28ecee1a65 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Thu, 16 Nov 2023 16:52:50 +0000 Subject: [PATCH 06/19] Handle possibility of nil startedAt time Signed-off-by: Thomas Newton --- flytepropeller/pkg/controller/nodes/executor.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index 3af658fb40..e934df88af 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -1324,9 +1324,14 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur if err := c.Abort(ctx, h, nCtx, "node failing", false); err != nil { return interfaces.NodeStatusUndefined, err } - startedAt := nodeStatus.GetStartedAt().Time - nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, metav1.Now(), nodeStatus.GetMessage(), nodeStatus.GetExecutionError()) - c.metrics.FailureDuration.Observe(ctx, startedAt, nodeStatus.GetStoppedAt().Time) + t := metav1.Now() + + startedAt := nodeStatus.GetStartedAt() + if startedAt == nil { + startedAt = &t + } + nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, t, nodeStatus.GetMessage(), nodeStatus.GetExecutionError()) + c.metrics.FailureDuration.Observe(ctx, startedAt.Time, nodeStatus.GetStoppedAt().Time) if nCtx.NodeExecutionMetadata().IsInterruptible() { c.metrics.InterruptibleNodesTerminated.Inc(ctx) } From fb549c43d72a6b433eb0f13539bebaabd2e7e0d8 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Thu, 16 Nov 2023 16:53:01 +0000 Subject: [PATCH 07/19] Update test assertions Signed-off-by: Thomas Newton --- .../flyteworkflow/v1alpha1/node_status_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go index 45c299687a..17fffb11cd 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go @@ -317,13 +317,13 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { err := &core.ExecutionError{} ns.UpdatePhase(p, n, msg, err) - assert.Equal(t, *ns.LastUpdatedAt, n) + assert.Nil(t, ns.LastUpdatedAt) assert.Nil(t, ns.QueuedAt) - assert.Equal(t, *ns.LastAttemptStartedAt, n) - assert.Equal(t, *ns.StartedAt, n) + assert.Nil(t, ns.LastAttemptStartedAt) + assert.Nil(t, ns.StartedAt) assert.Equal(t, *ns.StoppedAt, n) assert.Equal(t, p, ns.Phase) - assert.Equal(t, msg, ns.Message) + assert.Equal(t, ns.Message, "") assert.Equal(t, ns.Error.ExecutionError, err) }) @@ -334,13 +334,13 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { err := &core.ExecutionError{} ns.UpdatePhase(p, n, msg, err) - assert.Equal(t, *ns.LastUpdatedAt, n) + assert.Nil(t, ns.LastUpdatedAt) assert.Nil(t, ns.QueuedAt) - assert.Equal(t, *ns.LastAttemptStartedAt, n) - assert.Equal(t, *ns.StartedAt, n) + assert.Nil(t, ns.LastAttemptStartedAt) + assert.Nil(t, ns.StartedAt) assert.Equal(t, *ns.StoppedAt, n) assert.Equal(t, p, ns.Phase) - assert.Equal(t, msg, ns.Message) + assert.Equal(t, ns.Message, "") assert.Equal(t, ns.Error.ExecutionError, err) }) From def9c50546ea25b6760c88b019875953e7e6233f Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 13 Dec 2023 16:05:42 +0000 Subject: [PATCH 08/19] Config flag attempt 1 Signed-off-by: Thomas Newton --- .../pkg/apis/flyteworkflow/v1alpha1/iface.go | 2 +- .../flyteworkflow/v1alpha1/node_status.go | 28 ++++++++++--------- .../v1alpha1/node_status_test.go | 20 ++++++------- .../pkg/controller/config/config.go | 2 ++ .../pkg/controller/nodes/executor.go | 10 ++++--- 5 files changed, 34 insertions(+), 28 deletions(-) diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go index b3d744bd77..68c2121dd6 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -318,7 +318,7 @@ type MutableNodeStatus interface { SetOutputDir(d DataReference) SetParentNodeID(n *NodeID) SetParentTaskID(t *core.TaskExecutionIdentifier) - UpdatePhase(phase NodePhase, occurredAt metav1.Time, reason string, err *core.ExecutionError) + UpdatePhase(phase NodePhase, occurredAt metav1.Time, reason string, clearStateOnTermination bool, err *core.ExecutionError) IncrementAttempts() uint32 IncrementSystemFailures() uint32 SetCached() diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go index ffdff4f6e7..432350fbe3 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -594,7 +594,7 @@ func (in *NodeStatus) GetOrCreateArrayNodeStatus() MutableArrayNodeStatus { return in.ArrayNodeStatus } -func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason string, err *core.ExecutionError) { +func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason string, clearStateOnTermination bool, err *core.ExecutionError) { if in.Phase == p && in.Message == reason { // We will not update the phase multiple times. This prevents the comparison from returning false positive return @@ -629,18 +629,20 @@ func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason st if in.StoppedAt == nil { in.StoppedAt = &n } - // Clear most status related fields after reaching a terminal state. This keeps the CRD state small to avoid - // etcd size limits. Importantly we keep Phase, StoppedAt and Error which will be needed further. - in.Message = "" - in.QueuedAt = nil - in.StartedAt = nil - in.LastUpdatedAt = nil - in.LastAttemptStartedAt = nil - in.DynamicNodeStatus = nil - in.BranchStatus = nil - in.SubNodeStatus = nil - in.TaskNodeStatus = nil - in.WorkflowNodeStatus = nil + if p == NodePhaseSucceeded || p == NodePhaseSkipped || clearStateOnTermination { + // Clear most status related fields after reaching a terminal state. This keeps the CRD state small to avoid + // etcd size limits. Importantly we keep Phase, StoppedAt and Error which will be needed further. + in.Message = "" + in.QueuedAt = nil + in.StartedAt = nil + in.LastUpdatedAt = nil + in.LastAttemptStartedAt = nil + in.DynamicNodeStatus = nil + in.BranchStatus = nil + in.SubNodeStatus = nil + in.TaskNodeStatus = nil + in.WorkflowNodeStatus = nil + } } in.SetDirty() } diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go index 17fffb11cd..d0c7ddfe54 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go @@ -266,7 +266,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { Message: queued, } msg := queued - ns.UpdatePhase(p, n, msg, nil) + ns.UpdatePhase(p, n, msg, false, nil) assert.Nil(t, ns.QueuedAt) }) @@ -274,7 +274,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { p := NodePhaseQueued ns := NodeStatus{} msg := queued - ns.UpdatePhase(p, metav1.NewTime(time.Time{}), msg, nil) + ns.UpdatePhase(p, metav1.NewTime(time.Time{}), msg, false, nil) assert.NotNil(t, ns.QueuedAt) }) @@ -282,7 +282,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { ns := NodeStatus{} p := NodePhaseQueued msg := queued - ns.UpdatePhase(p, n, msg, nil) + ns.UpdatePhase(p, n, msg, false, nil) assert.Equal(t, *ns.LastUpdatedAt, n) assert.Equal(t, *ns.QueuedAt, n) @@ -298,7 +298,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { ns := NodeStatus{} p := NodePhaseRunning msg := "running" - ns.UpdatePhase(p, n, msg, nil) + ns.UpdatePhase(p, n, msg, false, nil) assert.Equal(t, *ns.LastUpdatedAt, n) assert.Nil(t, ns.QueuedAt) @@ -315,7 +315,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { p := NodePhaseFailed msg := "failed" err := &core.ExecutionError{} - ns.UpdatePhase(p, n, msg, err) + ns.UpdatePhase(p, n, msg, false, err) assert.Nil(t, ns.LastUpdatedAt) assert.Nil(t, ns.QueuedAt) @@ -332,7 +332,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { p := NodePhaseTimedOut msg := "tm" err := &core.ExecutionError{} - ns.UpdatePhase(p, n, msg, err) + ns.UpdatePhase(p, n, msg, false, err) assert.Nil(t, ns.LastUpdatedAt) assert.Nil(t, ns.QueuedAt) @@ -349,7 +349,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { ns := NodeStatus{} p := NodePhaseSucceeded msg := success - ns.UpdatePhase(p, n, msg, nil) + ns.UpdatePhase(p, n, msg, false, nil) assert.Nil(t, ns.LastUpdatedAt) assert.Nil(t, ns.QueuedAt) @@ -365,7 +365,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { ns := NodeStatus{} p := NodePhaseSucceeded msg := success - ns.UpdatePhase(p, n, msg, nil) + ns.UpdatePhase(p, n, msg, false, nil) assert.Nil(t, ns.LastUpdatedAt) assert.Nil(t, ns.QueuedAt) @@ -391,7 +391,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { } p := NodePhaseSucceeded msg := success - ns.UpdatePhase(p, n, msg, nil) + ns.UpdatePhase(p, n, msg, false, nil) assert.Nil(t, ns.LastUpdatedAt) assert.Nil(t, ns.QueuedAt) @@ -423,7 +423,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { n2 := metav1.NewTime(time.Now()) p := NodePhaseRunning msg := "running" - ns.UpdatePhase(p, n2, msg, nil) + ns.UpdatePhase(p, n2, msg, false, nil) assert.Equal(t, *ns.LastUpdatedAt, n2) assert.Equal(t, *ns.QueuedAt, n) diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index 10a83cd2fc..bdc3b1cfac 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -98,6 +98,7 @@ var ( InterruptibleFailureThreshold: -1, DefaultMaxAttempts: 1, IgnoreRetryCause: false, + ClearStateOnTermination: false, }, MaxStreakLength: 8, // Turbo mode is enabled by default ProfilerPort: config.Port{ @@ -211,6 +212,7 @@ type NodeConfig struct { InterruptibleFailureThreshold int32 `json:"interruptible-failure-threshold" pflag:"1,number of failures for a node to be still considered interruptible. Negative numbers are treated as complementary (ex. -1 means last attempt is non-interruptible).'"` DefaultMaxAttempts int32 `json:"default-max-attempts" pflag:"3,Default maximum number of attempts for a node"` IgnoreRetryCause bool `json:"ignore-retry-cause" pflag:",Ignore retry cause and count all attempts toward a node's max attempts"` + ClearStateOnTermination bool `json:"clear-state-on-termination" pflag:",Collapse node on any terminal state"` } // DefaultDeadlines contains default values for timeouts diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index e934df88af..bec146b062 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -475,6 +475,7 @@ func (c *recursiveNodeExecutor) WithNodeExecutionContextBuilder(nCtxBuilder inte type nodeExecutor struct { catalog catalog.Client clusterID string + clearStateOnTermination bool defaultActiveDeadline time.Duration defaultDataSandbox storage.DataReference defaultExecutionDeadline time.Duration @@ -1285,7 +1286,7 @@ func (c *nodeExecutor) handleRetryableFailure(ctx context.Context, nCtx interfac // NOTE: It is important to increment attempts only after abort has been called. Increment attempt mutates the state // Attempt is used throughout the system to determine the idempotent resource version. nodeStatus.IncrementAttempts() - nodeStatus.UpdatePhase(v1alpha1.NodePhaseRunning, metav1.Now(), "retrying", nil) + nodeStatus.UpdatePhase(v1alpha1.NodePhaseRunning, metav1.Now(), "retrying", c.clearStateOnTermination, nil) // We are going to retry in the next round, so we should clear all current state nodeStatus.ClearSubNodeStatus() nodeStatus.ClearTaskStatus() @@ -1330,7 +1331,7 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur if startedAt == nil { startedAt = &t } - nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, t, nodeStatus.GetMessage(), nodeStatus.GetExecutionError()) + nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, t, nodeStatus.GetMessage(), c.clearStateOnTermination, nodeStatus.GetExecutionError()) c.metrics.FailureDuration.Observe(ctx, startedAt.Time, nodeStatus.GetStoppedAt().Time) if nCtx.NodeExecutionMetadata().IsInterruptible() { c.metrics.InterruptibleNodesTerminated.Inc(ctx) @@ -1344,7 +1345,7 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur return interfaces.NodeStatusUndefined, err } - nodeStatus.UpdatePhase(v1alpha1.NodePhaseTimedOut, metav1.Now(), nodeStatus.GetMessage(), nodeStatus.GetExecutionError()) + nodeStatus.UpdatePhase(v1alpha1.NodePhaseTimedOut, metav1.Now(), nodeStatus.GetMessage(), c.clearStateOnTermination, nodeStatus.GetExecutionError()) c.metrics.TimedOutFailure.Inc(ctx) if nCtx.NodeExecutionMetadata().IsInterruptible() { c.metrics.InterruptibleNodesTerminated.Inc(ctx) @@ -1368,7 +1369,7 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur stopped = &t } c.metrics.SuccessDuration.Observe(ctx, started.Time, stopped.Time) - nodeStatus.UpdatePhase(v1alpha1.NodePhaseSucceeded, t, "completed successfully", nil) + nodeStatus.UpdatePhase(v1alpha1.NodePhaseSucceeded, t, "completed successfully", c.clearStateOnTermination, nil) if nCtx.NodeExecutionMetadata().IsInterruptible() { c.metrics.InterruptibleNodesTerminated.Inc(ctx) } @@ -1435,6 +1436,7 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora nodeExecutor := &nodeExecutor{ catalog: catalogClient, clusterID: clusterID, + clearStateOnTermination: nodeConfig.ClearStateOnTermination, defaultActiveDeadline: nodeConfig.DefaultDeadlines.DefaultNodeActiveDeadline.Duration, defaultDataSandbox: defaultRawOutputPrefix, defaultExecutionDeadline: nodeConfig.DefaultDeadlines.DefaultNodeExecutionDeadline.Duration, From a285f72de171642e339b777a187818c012786209 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 13 Dec 2023 16:39:09 +0000 Subject: [PATCH 09/19] Update more calls to UpdatePhase Signed-off-by: Thomas Newton --- flytepropeller/pkg/controller/nodes/branch/evaluator.go | 2 +- flytepropeller/pkg/controller/nodes/executor.go | 4 ++-- flytepropeller/pkg/controller/nodes/transformers.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/branch/evaluator.go b/flytepropeller/pkg/controller/nodes/branch/evaluator.go index 44dc8711e8..c62ed828df 100644 --- a/flytepropeller/pkg/controller/nodes/branch/evaluator.go +++ b/flytepropeller/pkg/controller/nodes/branch/evaluator.go @@ -129,7 +129,7 @@ func DecideBranch(ctx context.Context, nl executors.NodeLookup, nodeID v1alpha1. } nStatus := nl.GetNodeExecutionStatus(ctx, n.GetID()) logger.Infof(ctx, "Branch Setting Node[%v] status to Skipped!", skippedNodeID) - nStatus.UpdatePhase(v1alpha1.NodePhaseSkipped, v1.Now(), "Branch evaluated to false", nil) + nStatus.UpdatePhase(v1alpha1.NodePhaseSkipped, v1.Now(), "Branch evaluated to false", false, nil) } if selectedNodeID == nil { diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index bec146b062..f03d7cd27e 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -1006,7 +1006,7 @@ func (c *nodeExecutor) handleNotYetStartedNode(ctx context.Context, dag executor logger.Warningf(ctx, "Failed to record nodeEvent, error [%s]", err.Error()) return interfaces.NodeStatusUndefined, errors.Wrapf(errors.EventRecordingFailed, nCtx.NodeID(), err, "failed to record node event") } - UpdateNodeStatus(np, p, nCtx.NodeStateReader(), nodeStatus) + UpdateNodeStatus(np, p, nCtx.NodeStateReader(), nodeStatus, c.clearStateOnTermination) c.RecordTransitionLatency(ctx, dag, nCtx.ContextualNodeLookup(), nCtx.Node(), nodeStatus) } @@ -1272,7 +1272,7 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx inter } } - UpdateNodeStatus(np, p, nCtx.NodeStateReader(), nodeStatus) + UpdateNodeStatus(np, p, nCtx.NodeStateReader(), nodeStatus, c.clearStateOnTermination) return finalStatus, nil } diff --git a/flytepropeller/pkg/controller/nodes/transformers.go b/flytepropeller/pkg/controller/nodes/transformers.go index 8c0db1e57a..0b6bfb5f2b 100644 --- a/flytepropeller/pkg/controller/nodes/transformers.go +++ b/flytepropeller/pkg/controller/nodes/transformers.go @@ -228,10 +228,10 @@ func ToK8sTime(t time.Time) v1.Time { return v1.Time{Time: t} } -func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n interfaces.NodeStateReader, s v1alpha1.ExecutableNodeStatus) { +func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n interfaces.NodeStateReader, s v1alpha1.ExecutableNodeStatus clearStateOnTermination, bool) { // We update the phase and / or reason only if they are not already updated if np != s.GetPhase() || p.GetReason() != s.GetMessage() { - s.UpdatePhase(np, ToK8sTime(p.GetOccurredAt()), p.GetReason(), p.GetErr()) + s.UpdatePhase(np, ToK8sTime(p.GetOccurredAt()), p.GetReason(), clearStateOnTermination, p.GetErr()) } // Update TaskStatus if n.HasTaskNodeState() { From e475a78cd2a75db5fb7a5d21770dc7930f030a1a Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 13 Dec 2023 16:48:20 +0000 Subject: [PATCH 10/19] Update generated code Signed-off-by: Thomas Newton --- .../v1alpha1/mocks/ExecutableNodeStatus.go | 6 +++--- .../v1alpha1/mocks/MutableNodeStatus.go | 6 +++--- .../pkg/controller/config/config_flags.go | 1 + .../pkg/controller/config/config_flags_test.go | 14 ++++++++++++++ 4 files changed, 21 insertions(+), 6 deletions(-) diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go index cb447e06fc..82ae289e7d 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go @@ -1187,9 +1187,9 @@ func (_m *ExecutableNodeStatus) SetParentTaskID(t *core.TaskExecutionIdentifier) _m.Called(t) } -// UpdatePhase provides a mock function with given fields: phase, occurredAt, reason, err -func (_m *ExecutableNodeStatus) UpdatePhase(phase v1alpha1.NodePhase, occurredAt v1.Time, reason string, err *core.ExecutionError) { - _m.Called(phase, occurredAt, reason, err) +// UpdatePhase provides a mock function with given fields: phase, occurredAt, reason, clearStateOnTermination, err +func (_m *ExecutableNodeStatus) UpdatePhase(phase v1alpha1.NodePhase, occurredAt v1.Time, reason string, clearStateOnTermination bool, err *core.ExecutionError) { + _m.Called(phase, occurredAt, reason, clearStateOnTermination, err) } // VisitNodeStatuses provides a mock function with given fields: visitor diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go index b8c97f6be7..01cb14560d 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go @@ -587,7 +587,7 @@ func (_m *MutableNodeStatus) SetParentTaskID(t *core.TaskExecutionIdentifier) { _m.Called(t) } -// UpdatePhase provides a mock function with given fields: phase, occurredAt, reason, err -func (_m *MutableNodeStatus) UpdatePhase(phase v1alpha1.NodePhase, occurredAt v1.Time, reason string, err *core.ExecutionError) { - _m.Called(phase, occurredAt, reason, err) +// UpdatePhase provides a mock function with given fields: phase, occurredAt, reason, clearStateOnTermination, err +func (_m *MutableNodeStatus) UpdatePhase(phase v1alpha1.NodePhase, occurredAt v1.Time, reason string, clearStateOnTermination bool, err *core.ExecutionError) { + _m.Called(phase, occurredAt, reason, clearStateOnTermination, err) } diff --git a/flytepropeller/pkg/controller/config/config_flags.go b/flytepropeller/pkg/controller/config/config_flags.go index 8e9c71bcdb..dc55f9b3ad 100755 --- a/flytepropeller/pkg/controller/config/config_flags.go +++ b/flytepropeller/pkg/controller/config/config_flags.go @@ -96,6 +96,7 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.Int32(fmt.Sprintf("%v%v", prefix, "node-config.interruptible-failure-threshold"), defaultConfig.NodeConfig.InterruptibleFailureThreshold, "number of failures for a node to be still considered interruptible. Negative numbers are treated as complementary (ex. -1 means last attempt is non-interruptible).'") cmdFlags.Int32(fmt.Sprintf("%v%v", prefix, "node-config.default-max-attempts"), defaultConfig.NodeConfig.DefaultMaxAttempts, "Default maximum number of attempts for a node") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "node-config.ignore-retry-cause"), defaultConfig.NodeConfig.IgnoreRetryCause, "Ignore retry cause and count all attempts toward a node's max attempts") + cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "node-config.clear-state-on-termination"), defaultConfig.NodeConfig.ClearStateOnTermination, "Collapse node on any terminal state") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "max-streak-length"), defaultConfig.MaxStreakLength, "Maximum number of consecutive rounds that one propeller worker can use for one workflow - >1 => turbo-mode is enabled.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "event-config.raw-output-policy"), defaultConfig.EventConfig.RawOutputPolicy, "How output data should be passed along in execution events.") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "event-config.fallback-to-output-reference"), defaultConfig.EventConfig.FallbackToOutputReference, "Whether output data should be sent by reference when it is too large to be sent inline in execution events.") diff --git a/flytepropeller/pkg/controller/config/config_flags_test.go b/flytepropeller/pkg/controller/config/config_flags_test.go index f48d01ebea..5039299751 100755 --- a/flytepropeller/pkg/controller/config/config_flags_test.go +++ b/flytepropeller/pkg/controller/config/config_flags_test.go @@ -743,6 +743,20 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_node-config.clear-state-on-termination", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("node-config.clear-state-on-termination", testValue) + if vBool, err := cmdFlags.GetBool("node-config.clear-state-on-termination"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vBool), &actual.NodeConfig.ClearStateOnTermination) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) t.Run("Test_max-streak-length", func(t *testing.T) { t.Run("Override", func(t *testing.T) { From 09786cb11f5db1786e1c8dec5bd7ffac7f013a14 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 13 Dec 2023 17:32:22 +0000 Subject: [PATCH 11/19] Update tests Signed-off-by: Thomas Newton --- .../v1alpha1/node_status_test.go | 294 ++++++++++-------- 1 file changed, 165 insertions(+), 129 deletions(-) diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go index d0c7ddfe54..268cad6330 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go @@ -259,63 +259,178 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { n := metav1.NewTime(time.Now()) const queued = "queued" - t.Run("identical-phase", func(t *testing.T) { - p := NodePhaseQueued - ns := NodeStatus{ - Phase: p, - Message: queued, - } - msg := queued - ns.UpdatePhase(p, n, msg, false, nil) - assert.Nil(t, ns.QueuedAt) - }) + const success = "success" + for _, clearStateOnAnyTermination := range []bool{false, true} { + t.Run("identical-phase", func(t *testing.T) { + p := NodePhaseQueued + ns := NodeStatus{ + Phase: p, + Message: queued, + } + msg := queued + ns.UpdatePhase(p, n, msg, clearStateOnAnyTermination, nil) + assert.Nil(t, ns.QueuedAt) + }) - t.Run("zero", func(t *testing.T) { - p := NodePhaseQueued - ns := NodeStatus{} - msg := queued - ns.UpdatePhase(p, metav1.NewTime(time.Time{}), msg, false, nil) - assert.NotNil(t, ns.QueuedAt) - }) + t.Run("zero", func(t *testing.T) { + p := NodePhaseQueued + ns := NodeStatus{} + msg := queued + ns.UpdatePhase(p, metav1.NewTime(time.Time{}), msg, clearStateOnAnyTermination, nil) + assert.NotNil(t, ns.QueuedAt) + }) - t.Run("non-terminal", func(t *testing.T) { - ns := NodeStatus{} - p := NodePhaseQueued - msg := queued - ns.UpdatePhase(p, n, msg, false, nil) + t.Run("non-terminal", func(t *testing.T) { + ns := NodeStatus{} + p := NodePhaseQueued + msg := queued + ns.UpdatePhase(p, n, msg, clearStateOnAnyTermination, nil) + + assert.Equal(t, *ns.LastUpdatedAt, n) + assert.Equal(t, *ns.QueuedAt, n) + assert.Nil(t, ns.LastAttemptStartedAt) + assert.Nil(t, ns.StartedAt) + assert.Nil(t, ns.StoppedAt) + assert.Equal(t, p, ns.Phase) + assert.Equal(t, msg, ns.Message) + assert.Nil(t, ns.Error) + }) - assert.Equal(t, *ns.LastUpdatedAt, n) - assert.Equal(t, *ns.QueuedAt, n) - assert.Nil(t, ns.LastAttemptStartedAt) - assert.Nil(t, ns.StartedAt) - assert.Nil(t, ns.StoppedAt) - assert.Equal(t, p, ns.Phase) - assert.Equal(t, msg, ns.Message) - assert.Nil(t, ns.Error) - }) + t.Run("non-terminal-running", func(t *testing.T) { + ns := NodeStatus{} + p := NodePhaseRunning + msg := "running" + ns.UpdatePhase(p, n, msg, false, nil) + + assert.Equal(t, *ns.LastUpdatedAt, n) + assert.Nil(t, ns.QueuedAt) + assert.Equal(t, *ns.LastAttemptStartedAt, n) + assert.Equal(t, *ns.StartedAt, n) + assert.Nil(t, ns.StoppedAt) + assert.Equal(t, p, ns.Phase) + assert.Equal(t, msg, ns.Message) + assert.Nil(t, ns.Error) + }) + + t.Run("terminal-success", func(t *testing.T) { + ns := NodeStatus{} + p := NodePhaseSucceeded + msg := success + ns.UpdatePhase(p, n, msg, false, nil) + + assert.Nil(t, ns.LastUpdatedAt) + assert.Nil(t, ns.QueuedAt) + assert.Nil(t, ns.LastAttemptStartedAt) + assert.Nil(t, ns.StartedAt) + assert.Equal(t, *ns.StoppedAt, n) + assert.Equal(t, p, ns.Phase) + assert.Empty(t, ns.Message) + assert.Nil(t, ns.Error) + }) - t.Run("non-terminal-running", func(t *testing.T) { + t.Run("terminal-skipped", func(t *testing.T) { + ns := NodeStatus{} + p := NodePhaseSucceeded + msg := success + ns.UpdatePhase(p, n, msg, false, nil) + + assert.Nil(t, ns.LastUpdatedAt) + assert.Nil(t, ns.QueuedAt) + assert.Nil(t, ns.LastAttemptStartedAt) + assert.Nil(t, ns.StartedAt) + assert.Equal(t, *ns.StoppedAt, n) + assert.Equal(t, p, ns.Phase) + assert.Empty(t, ns.Message) + assert.Nil(t, ns.Error) + }) + + t.Run("terminal-success-preset", func(t *testing.T) { + ns := NodeStatus{ + QueuedAt: &n, + StartedAt: &n, + LastUpdatedAt: &n, + LastAttemptStartedAt: &n, + WorkflowNodeStatus: &WorkflowNodeStatus{}, + BranchStatus: &BranchNodeStatus{}, + DynamicNodeStatus: &DynamicNodeStatus{}, + TaskNodeStatus: &TaskNodeStatus{}, + SubNodeStatus: map[NodeID]*NodeStatus{}, + } + p := NodePhaseSucceeded + msg := success + ns.UpdatePhase(p, n, msg, false, nil) + + assert.Nil(t, ns.LastUpdatedAt) + assert.Nil(t, ns.QueuedAt) + assert.Nil(t, ns.LastAttemptStartedAt) + assert.Nil(t, ns.StartedAt) + assert.Equal(t, *ns.StoppedAt, n) + assert.Equal(t, p, ns.Phase) + assert.Empty(t, ns.Message) + assert.Nil(t, ns.Error) + assert.Nil(t, ns.SubNodeStatus) + assert.Nil(t, ns.DynamicNodeStatus) + assert.Nil(t, ns.WorkflowNodeStatus) + assert.Nil(t, ns.BranchStatus) + assert.Nil(t, ns.TaskNodeStatus) + }) + + t.Run("non-terminal-preset", func(t *testing.T) { + ns := NodeStatus{ + QueuedAt: &n, + StartedAt: &n, + LastUpdatedAt: &n, + LastAttemptStartedAt: &n, + WorkflowNodeStatus: &WorkflowNodeStatus{}, + BranchStatus: &BranchNodeStatus{}, + DynamicNodeStatus: &DynamicNodeStatus{}, + TaskNodeStatus: &TaskNodeStatus{}, + SubNodeStatus: map[NodeID]*NodeStatus{}, + } + n2 := metav1.NewTime(time.Now()) + p := NodePhaseRunning + msg := "running" + ns.UpdatePhase(p, n2, msg, false, nil) + + assert.Equal(t, *ns.LastUpdatedAt, n2) + assert.Equal(t, *ns.QueuedAt, n) + assert.Equal(t, *ns.LastAttemptStartedAt, n) + assert.Equal(t, *ns.StartedAt, n) + assert.Nil(t, ns.StoppedAt) + assert.Equal(t, p, ns.Phase) + assert.Equal(t, msg, ns.Message) + assert.Nil(t, ns.Error) + assert.NotNil(t, ns.SubNodeStatus) + assert.NotNil(t, ns.DynamicNodeStatus) + assert.NotNil(t, ns.WorkflowNodeStatus) + assert.NotNil(t, ns.BranchStatus) + assert.NotNil(t, ns.TaskNodeStatus) + }) + } + + t.Run("terminal-fail", func(t *testing.T) { ns := NodeStatus{} - p := NodePhaseRunning - msg := "running" - ns.UpdatePhase(p, n, msg, false, nil) + p := NodePhaseFailed + msg := "failed" + err := &core.ExecutionError{} + ns.UpdatePhase(p, n, msg, false, err) assert.Equal(t, *ns.LastUpdatedAt, n) assert.Nil(t, ns.QueuedAt) assert.Equal(t, *ns.LastAttemptStartedAt, n) assert.Equal(t, *ns.StartedAt, n) - assert.Nil(t, ns.StoppedAt) + assert.Equal(t, *ns.StoppedAt, n) assert.Equal(t, p, ns.Phase) assert.Equal(t, msg, ns.Message) - assert.Nil(t, ns.Error) + assert.Equal(t, ns.Error.ExecutionError, err) }) - t.Run("terminal-fail", func(t *testing.T) { + t.Run("terminal-fail-clear-state-on-any-termination", func(t *testing.T) { ns := NodeStatus{} p := NodePhaseFailed msg := "failed" err := &core.ExecutionError{} - ns.UpdatePhase(p, n, msg, false, err) + ns.UpdatePhase(p, n, msg, true, err) assert.Nil(t, ns.LastUpdatedAt) assert.Nil(t, ns.QueuedAt) @@ -334,64 +449,22 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { err := &core.ExecutionError{} ns.UpdatePhase(p, n, msg, false, err) - assert.Nil(t, ns.LastUpdatedAt) + assert.Equal(t, *ns.LastUpdatedAt, n) assert.Nil(t, ns.QueuedAt) - assert.Nil(t, ns.LastAttemptStartedAt) - assert.Nil(t, ns.StartedAt) + assert.Equal(t, *ns.LastAttemptStartedAt, n) + assert.Equal(t, *ns.StartedAt, n) assert.Equal(t, *ns.StoppedAt, n) assert.Equal(t, p, ns.Phase) - assert.Equal(t, ns.Message, "") + assert.Equal(t, msg, ns.Message) assert.Equal(t, ns.Error.ExecutionError, err) }) - const success = "success" - t.Run("terminal-success", func(t *testing.T) { - ns := NodeStatus{} - p := NodePhaseSucceeded - msg := success - ns.UpdatePhase(p, n, msg, false, nil) - - assert.Nil(t, ns.LastUpdatedAt) - assert.Nil(t, ns.QueuedAt) - assert.Nil(t, ns.LastAttemptStartedAt) - assert.Nil(t, ns.StartedAt) - assert.Equal(t, *ns.StoppedAt, n) - assert.Equal(t, p, ns.Phase) - assert.Empty(t, ns.Message) - assert.Nil(t, ns.Error) - }) - - t.Run("terminal-skipped", func(t *testing.T) { + t.Run("terminal-timeout-clear-state-on-any-termination", func(t *testing.T) { ns := NodeStatus{} - p := NodePhaseSucceeded - msg := success - ns.UpdatePhase(p, n, msg, false, nil) - - assert.Nil(t, ns.LastUpdatedAt) - assert.Nil(t, ns.QueuedAt) - assert.Nil(t, ns.LastAttemptStartedAt) - assert.Nil(t, ns.StartedAt) - assert.Equal(t, *ns.StoppedAt, n) - assert.Equal(t, p, ns.Phase) - assert.Empty(t, ns.Message) - assert.Nil(t, ns.Error) - }) - - t.Run("terminal-success-preset", func(t *testing.T) { - ns := NodeStatus{ - QueuedAt: &n, - StartedAt: &n, - LastUpdatedAt: &n, - LastAttemptStartedAt: &n, - WorkflowNodeStatus: &WorkflowNodeStatus{}, - BranchStatus: &BranchNodeStatus{}, - DynamicNodeStatus: &DynamicNodeStatus{}, - TaskNodeStatus: &TaskNodeStatus{}, - SubNodeStatus: map[NodeID]*NodeStatus{}, - } - p := NodePhaseSucceeded - msg := success - ns.UpdatePhase(p, n, msg, false, nil) + p := NodePhaseTimedOut + msg := "tm" + err := &core.ExecutionError{} + ns.UpdatePhase(p, n, msg, true, err) assert.Nil(t, ns.LastUpdatedAt) assert.Nil(t, ns.QueuedAt) @@ -399,44 +472,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { assert.Nil(t, ns.StartedAt) assert.Equal(t, *ns.StoppedAt, n) assert.Equal(t, p, ns.Phase) - assert.Empty(t, ns.Message) - assert.Nil(t, ns.Error) - assert.Nil(t, ns.SubNodeStatus) - assert.Nil(t, ns.DynamicNodeStatus) - assert.Nil(t, ns.WorkflowNodeStatus) - assert.Nil(t, ns.BranchStatus) - assert.Nil(t, ns.TaskNodeStatus) - }) - - t.Run("non-terminal-preset", func(t *testing.T) { - ns := NodeStatus{ - QueuedAt: &n, - StartedAt: &n, - LastUpdatedAt: &n, - LastAttemptStartedAt: &n, - WorkflowNodeStatus: &WorkflowNodeStatus{}, - BranchStatus: &BranchNodeStatus{}, - DynamicNodeStatus: &DynamicNodeStatus{}, - TaskNodeStatus: &TaskNodeStatus{}, - SubNodeStatus: map[NodeID]*NodeStatus{}, - } - n2 := metav1.NewTime(time.Now()) - p := NodePhaseRunning - msg := "running" - ns.UpdatePhase(p, n2, msg, false, nil) - - assert.Equal(t, *ns.LastUpdatedAt, n2) - assert.Equal(t, *ns.QueuedAt, n) - assert.Equal(t, *ns.LastAttemptStartedAt, n) - assert.Equal(t, *ns.StartedAt, n) - assert.Nil(t, ns.StoppedAt) - assert.Equal(t, p, ns.Phase) - assert.Equal(t, msg, ns.Message) - assert.Nil(t, ns.Error) - assert.NotNil(t, ns.SubNodeStatus) - assert.NotNil(t, ns.DynamicNodeStatus) - assert.NotNil(t, ns.WorkflowNodeStatus) - assert.NotNil(t, ns.BranchStatus) - assert.NotNil(t, ns.TaskNodeStatus) + assert.Equal(t, ns.Message, "") + assert.Equal(t, ns.Error.ExecutionError, err) }) } From 2c0b4d4e5d35b9f03367860d6d037a8e7ca7ab5d Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 13 Dec 2023 18:06:11 +0000 Subject: [PATCH 12/19] Fix more tests Signed-off-by: Thomas Newton --- flytepropeller/pkg/controller/nodes/executor.go | 2 +- flytepropeller/pkg/controller/nodes/executor_test.go | 5 +++-- flytepropeller/pkg/controller/nodes/transformers.go | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index f03d7cd27e..6d8b42a3a0 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -1279,7 +1279,7 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx inter func (c *nodeExecutor) handleRetryableFailure(ctx context.Context, nCtx interfaces.NodeExecutionContext, h interfaces.NodeHandler) (interfaces.NodeStatus, error) { nodeStatus := nCtx.NodeStatus() logger.Debugf(ctx, "node failed with retryable failure, aborting and finalizing, message: %s", nodeStatus.GetMessage()) - if err := c.Abort(ctx, h, nCtx, nodeStatus.GetExecutionError().GetMessage(), false); err != nil { + if err := c.Abort(ctx, h, nCtx, nodeStatus.GetMessage(), false); err != nil { return interfaces.NodeStatusUndefined, err } diff --git a/flytepropeller/pkg/controller/nodes/executor_test.go b/flytepropeller/pkg/controller/nodes/executor_test.go index 222e0a05d5..adf6dee61b 100644 --- a/flytepropeller/pkg/controller/nodes/executor_test.go +++ b/flytepropeller/pkg/controller/nodes/executor_test.go @@ -582,7 +582,8 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { mockN2Status.OnGetStoppedAt().Return(nil) var ee *core.ExecutionError - mockN2Status.On("UpdatePhase", expectedN2Phase, mock.Anything, mock.AnythingOfType("string"), ee) + // TODO: Add tests case with clearStateOnTermination=true + mockN2Status.On("UpdatePhase", expectedN2Phase, mock.Anything, mock.AnythingOfType("string"), false, ee) mockN2Status.OnIsDirty().Return(false) mockN2Status.OnGetTaskNodeStatus().Return(nil) mockN2Status.On("ClearDynamicNodeStatus").Return(nil) @@ -1329,7 +1330,7 @@ func TestNodeExecutor_RecursiveNodeHandler_BranchNode(t *testing.T) { if test.phaseUpdateExpected { var ee *core.ExecutionError - branchTakeNodeStatus.On("UpdatePhase", v1alpha1.NodePhaseQueued, mock.Anything, mock.Anything, ee).Return() + branchTakeNodeStatus.On("UpdatePhase", v1alpha1.NodePhaseQueued, mock.Anything, mock.Anything, false, ee).Return() } leafDag := executors.NewLeafNodeDAGStructure(branchTakenNodeID, parentBranchNodeID) diff --git a/flytepropeller/pkg/controller/nodes/transformers.go b/flytepropeller/pkg/controller/nodes/transformers.go index 0b6bfb5f2b..5b6631918e 100644 --- a/flytepropeller/pkg/controller/nodes/transformers.go +++ b/flytepropeller/pkg/controller/nodes/transformers.go @@ -228,7 +228,7 @@ func ToK8sTime(t time.Time) v1.Time { return v1.Time{Time: t} } -func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n interfaces.NodeStateReader, s v1alpha1.ExecutableNodeStatus clearStateOnTermination, bool) { +func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n interfaces.NodeStateReader, s v1alpha1.ExecutableNodeStatus, clearStateOnTermination bool) { // We update the phase and / or reason only if they are not already updated if np != s.GetPhase() || p.GetReason() != s.GetMessage() { s.UpdatePhase(np, ToK8sTime(p.GetOccurredAt()), p.GetReason(), clearStateOnTermination, p.GetErr()) From 6cc9a9da443eb061480b8e4abae13e53e8cdae24 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 13 Dec 2023 18:24:45 +0000 Subject: [PATCH 13/19] Fix tests Signed-off-by: Thomas Newton --- .../pkg/apis/flyteworkflow/v1alpha1/node_status.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go index 432350fbe3..291d8509a7 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -642,6 +642,13 @@ func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason st in.SubNodeStatus = nil in.TaskNodeStatus = nil in.WorkflowNodeStatus = nil + } else { + if in.StartedAt == nil { + in.StartedAt = &n + } + if in.LastAttemptStartedAt == nil { + in.LastAttemptStartedAt = &n + } } } in.SetDirty() From 2efb329f0bb9da5f90dbc99bd17f91ae4d307e5b Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 13 Dec 2023 18:37:55 +0000 Subject: [PATCH 14/19] Rename clear-state-on-termination -> clear-state-on-any-termination Signed-off-by: Thomas Newton --- .../pkg/apis/flyteworkflow/v1alpha1/iface.go | 2 +- .../v1alpha1/mocks/ExecutableNodeStatus.go | 6 +++--- .../v1alpha1/mocks/MutableNodeStatus.go | 6 +++--- .../apis/flyteworkflow/v1alpha1/node_status.go | 4 ++-- flytepropeller/pkg/controller/config/config.go | 4 ++-- .../pkg/controller/config/config_flags.go | 2 +- .../pkg/controller/config/config_flags_test.go | 8 ++++---- flytepropeller/pkg/controller/nodes/executor.go | 16 ++++++++-------- .../pkg/controller/nodes/executor_test.go | 2 +- .../pkg/controller/nodes/transformers.go | 4 ++-- 10 files changed, 27 insertions(+), 27 deletions(-) diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go index 68c2121dd6..f1a3344426 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -318,7 +318,7 @@ type MutableNodeStatus interface { SetOutputDir(d DataReference) SetParentNodeID(n *NodeID) SetParentTaskID(t *core.TaskExecutionIdentifier) - UpdatePhase(phase NodePhase, occurredAt metav1.Time, reason string, clearStateOnTermination bool, err *core.ExecutionError) + UpdatePhase(phase NodePhase, occurredAt metav1.Time, reason string, clearStateOnAnyTermination bool, err *core.ExecutionError) IncrementAttempts() uint32 IncrementSystemFailures() uint32 SetCached() diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go index 82ae289e7d..e6d2309df1 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go @@ -1187,9 +1187,9 @@ func (_m *ExecutableNodeStatus) SetParentTaskID(t *core.TaskExecutionIdentifier) _m.Called(t) } -// UpdatePhase provides a mock function with given fields: phase, occurredAt, reason, clearStateOnTermination, err -func (_m *ExecutableNodeStatus) UpdatePhase(phase v1alpha1.NodePhase, occurredAt v1.Time, reason string, clearStateOnTermination bool, err *core.ExecutionError) { - _m.Called(phase, occurredAt, reason, clearStateOnTermination, err) +// UpdatePhase provides a mock function with given fields: phase, occurredAt, reason, clearStateOnAnyTermination, err +func (_m *ExecutableNodeStatus) UpdatePhase(phase v1alpha1.NodePhase, occurredAt v1.Time, reason string, clearStateOnAnyTermination bool, err *core.ExecutionError) { + _m.Called(phase, occurredAt, reason, clearStateOnAnyTermination, err) } // VisitNodeStatuses provides a mock function with given fields: visitor diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go index 01cb14560d..d6bdd0510e 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go @@ -587,7 +587,7 @@ func (_m *MutableNodeStatus) SetParentTaskID(t *core.TaskExecutionIdentifier) { _m.Called(t) } -// UpdatePhase provides a mock function with given fields: phase, occurredAt, reason, clearStateOnTermination, err -func (_m *MutableNodeStatus) UpdatePhase(phase v1alpha1.NodePhase, occurredAt v1.Time, reason string, clearStateOnTermination bool, err *core.ExecutionError) { - _m.Called(phase, occurredAt, reason, clearStateOnTermination, err) +// UpdatePhase provides a mock function with given fields: phase, occurredAt, reason, clearStateOnAnyTermination, err +func (_m *MutableNodeStatus) UpdatePhase(phase v1alpha1.NodePhase, occurredAt v1.Time, reason string, clearStateOnAnyTermination bool, err *core.ExecutionError) { + _m.Called(phase, occurredAt, reason, clearStateOnAnyTermination, err) } diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go index 291d8509a7..e4fc8bc41a 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -594,7 +594,7 @@ func (in *NodeStatus) GetOrCreateArrayNodeStatus() MutableArrayNodeStatus { return in.ArrayNodeStatus } -func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason string, clearStateOnTermination bool, err *core.ExecutionError) { +func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason string, clearStateOnAnyTermination bool, err *core.ExecutionError) { if in.Phase == p && in.Message == reason { // We will not update the phase multiple times. This prevents the comparison from returning false positive return @@ -629,7 +629,7 @@ func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason st if in.StoppedAt == nil { in.StoppedAt = &n } - if p == NodePhaseSucceeded || p == NodePhaseSkipped || clearStateOnTermination { + if p == NodePhaseSucceeded || p == NodePhaseSkipped || clearStateOnAnyTermination { // Clear most status related fields after reaching a terminal state. This keeps the CRD state small to avoid // etcd size limits. Importantly we keep Phase, StoppedAt and Error which will be needed further. in.Message = "" diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index bdc3b1cfac..9dce351b59 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -98,7 +98,7 @@ var ( InterruptibleFailureThreshold: -1, DefaultMaxAttempts: 1, IgnoreRetryCause: false, - ClearStateOnTermination: false, + ClearStateOnAnyTermination: false, }, MaxStreakLength: 8, // Turbo mode is enabled by default ProfilerPort: config.Port{ @@ -212,7 +212,7 @@ type NodeConfig struct { InterruptibleFailureThreshold int32 `json:"interruptible-failure-threshold" pflag:"1,number of failures for a node to be still considered interruptible. Negative numbers are treated as complementary (ex. -1 means last attempt is non-interruptible).'"` DefaultMaxAttempts int32 `json:"default-max-attempts" pflag:"3,Default maximum number of attempts for a node"` IgnoreRetryCause bool `json:"ignore-retry-cause" pflag:",Ignore retry cause and count all attempts toward a node's max attempts"` - ClearStateOnTermination bool `json:"clear-state-on-termination" pflag:",Collapse node on any terminal state"` + ClearStateOnAnyTermination bool `json:"clear-state-on-any-termination" pflag:",Collapse node on any terminal state"` } // DefaultDeadlines contains default values for timeouts diff --git a/flytepropeller/pkg/controller/config/config_flags.go b/flytepropeller/pkg/controller/config/config_flags.go index dc55f9b3ad..e8b03cf376 100755 --- a/flytepropeller/pkg/controller/config/config_flags.go +++ b/flytepropeller/pkg/controller/config/config_flags.go @@ -96,7 +96,7 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.Int32(fmt.Sprintf("%v%v", prefix, "node-config.interruptible-failure-threshold"), defaultConfig.NodeConfig.InterruptibleFailureThreshold, "number of failures for a node to be still considered interruptible. Negative numbers are treated as complementary (ex. -1 means last attempt is non-interruptible).'") cmdFlags.Int32(fmt.Sprintf("%v%v", prefix, "node-config.default-max-attempts"), defaultConfig.NodeConfig.DefaultMaxAttempts, "Default maximum number of attempts for a node") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "node-config.ignore-retry-cause"), defaultConfig.NodeConfig.IgnoreRetryCause, "Ignore retry cause and count all attempts toward a node's max attempts") - cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "node-config.clear-state-on-termination"), defaultConfig.NodeConfig.ClearStateOnTermination, "Collapse node on any terminal state") + cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "node-config.clear-state-on-any-termination"), defaultConfig.NodeConfig.ClearStateOnAnyTermination, "Collapse node on any terminal state") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "max-streak-length"), defaultConfig.MaxStreakLength, "Maximum number of consecutive rounds that one propeller worker can use for one workflow - >1 => turbo-mode is enabled.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "event-config.raw-output-policy"), defaultConfig.EventConfig.RawOutputPolicy, "How output data should be passed along in execution events.") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "event-config.fallback-to-output-reference"), defaultConfig.EventConfig.FallbackToOutputReference, "Whether output data should be sent by reference when it is too large to be sent inline in execution events.") diff --git a/flytepropeller/pkg/controller/config/config_flags_test.go b/flytepropeller/pkg/controller/config/config_flags_test.go index 5039299751..d083ce0222 100755 --- a/flytepropeller/pkg/controller/config/config_flags_test.go +++ b/flytepropeller/pkg/controller/config/config_flags_test.go @@ -743,14 +743,14 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_node-config.clear-state-on-termination", func(t *testing.T) { + t.Run("Test_node-config.clear-state-on-any-termination", func(t *testing.T) { t.Run("Override", func(t *testing.T) { testValue := "1" - cmdFlags.Set("node-config.clear-state-on-termination", testValue) - if vBool, err := cmdFlags.GetBool("node-config.clear-state-on-termination"); err == nil { - testDecodeJson_Config(t, fmt.Sprintf("%v", vBool), &actual.NodeConfig.ClearStateOnTermination) + cmdFlags.Set("node-config.clear-state-on-any-termination", testValue) + if vBool, err := cmdFlags.GetBool("node-config.clear-state-on-any-termination"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vBool), &actual.NodeConfig.ClearStateOnAnyTermination) } else { assert.FailNow(t, err.Error()) diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index 6d8b42a3a0..f897ea8693 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -475,7 +475,7 @@ func (c *recursiveNodeExecutor) WithNodeExecutionContextBuilder(nCtxBuilder inte type nodeExecutor struct { catalog catalog.Client clusterID string - clearStateOnTermination bool + clearStateOnAnyTermination bool defaultActiveDeadline time.Duration defaultDataSandbox storage.DataReference defaultExecutionDeadline time.Duration @@ -1006,7 +1006,7 @@ func (c *nodeExecutor) handleNotYetStartedNode(ctx context.Context, dag executor logger.Warningf(ctx, "Failed to record nodeEvent, error [%s]", err.Error()) return interfaces.NodeStatusUndefined, errors.Wrapf(errors.EventRecordingFailed, nCtx.NodeID(), err, "failed to record node event") } - UpdateNodeStatus(np, p, nCtx.NodeStateReader(), nodeStatus, c.clearStateOnTermination) + UpdateNodeStatus(np, p, nCtx.NodeStateReader(), nodeStatus, c.clearStateOnAnyTermination) c.RecordTransitionLatency(ctx, dag, nCtx.ContextualNodeLookup(), nCtx.Node(), nodeStatus) } @@ -1272,7 +1272,7 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx inter } } - UpdateNodeStatus(np, p, nCtx.NodeStateReader(), nodeStatus, c.clearStateOnTermination) + UpdateNodeStatus(np, p, nCtx.NodeStateReader(), nodeStatus, c.clearStateOnAnyTermination) return finalStatus, nil } @@ -1286,7 +1286,7 @@ func (c *nodeExecutor) handleRetryableFailure(ctx context.Context, nCtx interfac // NOTE: It is important to increment attempts only after abort has been called. Increment attempt mutates the state // Attempt is used throughout the system to determine the idempotent resource version. nodeStatus.IncrementAttempts() - nodeStatus.UpdatePhase(v1alpha1.NodePhaseRunning, metav1.Now(), "retrying", c.clearStateOnTermination, nil) + nodeStatus.UpdatePhase(v1alpha1.NodePhaseRunning, metav1.Now(), "retrying", c.clearStateOnAnyTermination, nil) // We are going to retry in the next round, so we should clear all current state nodeStatus.ClearSubNodeStatus() nodeStatus.ClearTaskStatus() @@ -1331,7 +1331,7 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur if startedAt == nil { startedAt = &t } - nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, t, nodeStatus.GetMessage(), c.clearStateOnTermination, nodeStatus.GetExecutionError()) + nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, t, nodeStatus.GetMessage(), c.clearStateOnAnyTermination, nodeStatus.GetExecutionError()) c.metrics.FailureDuration.Observe(ctx, startedAt.Time, nodeStatus.GetStoppedAt().Time) if nCtx.NodeExecutionMetadata().IsInterruptible() { c.metrics.InterruptibleNodesTerminated.Inc(ctx) @@ -1345,7 +1345,7 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur return interfaces.NodeStatusUndefined, err } - nodeStatus.UpdatePhase(v1alpha1.NodePhaseTimedOut, metav1.Now(), nodeStatus.GetMessage(), c.clearStateOnTermination, nodeStatus.GetExecutionError()) + nodeStatus.UpdatePhase(v1alpha1.NodePhaseTimedOut, metav1.Now(), nodeStatus.GetMessage(), c.clearStateOnAnyTermination, nodeStatus.GetExecutionError()) c.metrics.TimedOutFailure.Inc(ctx) if nCtx.NodeExecutionMetadata().IsInterruptible() { c.metrics.InterruptibleNodesTerminated.Inc(ctx) @@ -1369,7 +1369,7 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur stopped = &t } c.metrics.SuccessDuration.Observe(ctx, started.Time, stopped.Time) - nodeStatus.UpdatePhase(v1alpha1.NodePhaseSucceeded, t, "completed successfully", c.clearStateOnTermination, nil) + nodeStatus.UpdatePhase(v1alpha1.NodePhaseSucceeded, t, "completed successfully", c.clearStateOnAnyTermination, nil) if nCtx.NodeExecutionMetadata().IsInterruptible() { c.metrics.InterruptibleNodesTerminated.Inc(ctx) } @@ -1436,7 +1436,7 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora nodeExecutor := &nodeExecutor{ catalog: catalogClient, clusterID: clusterID, - clearStateOnTermination: nodeConfig.ClearStateOnTermination, + clearStateOnAnyTermination: nodeConfig.ClearStateOnAnyTermination, defaultActiveDeadline: nodeConfig.DefaultDeadlines.DefaultNodeActiveDeadline.Duration, defaultDataSandbox: defaultRawOutputPrefix, defaultExecutionDeadline: nodeConfig.DefaultDeadlines.DefaultNodeExecutionDeadline.Duration, diff --git a/flytepropeller/pkg/controller/nodes/executor_test.go b/flytepropeller/pkg/controller/nodes/executor_test.go index adf6dee61b..648f4e339b 100644 --- a/flytepropeller/pkg/controller/nodes/executor_test.go +++ b/flytepropeller/pkg/controller/nodes/executor_test.go @@ -582,7 +582,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { mockN2Status.OnGetStoppedAt().Return(nil) var ee *core.ExecutionError - // TODO: Add tests case with clearStateOnTermination=true + // TODO: Add tests case with clearStateOnAnyTermination=true mockN2Status.On("UpdatePhase", expectedN2Phase, mock.Anything, mock.AnythingOfType("string"), false, ee) mockN2Status.OnIsDirty().Return(false) mockN2Status.OnGetTaskNodeStatus().Return(nil) diff --git a/flytepropeller/pkg/controller/nodes/transformers.go b/flytepropeller/pkg/controller/nodes/transformers.go index 5b6631918e..d4e8922c56 100644 --- a/flytepropeller/pkg/controller/nodes/transformers.go +++ b/flytepropeller/pkg/controller/nodes/transformers.go @@ -228,10 +228,10 @@ func ToK8sTime(t time.Time) v1.Time { return v1.Time{Time: t} } -func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n interfaces.NodeStateReader, s v1alpha1.ExecutableNodeStatus, clearStateOnTermination bool) { +func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n interfaces.NodeStateReader, s v1alpha1.ExecutableNodeStatus, clearStateOnAnyTermination bool) { // We update the phase and / or reason only if they are not already updated if np != s.GetPhase() || p.GetReason() != s.GetMessage() { - s.UpdatePhase(np, ToK8sTime(p.GetOccurredAt()), p.GetReason(), clearStateOnTermination, p.GetErr()) + s.UpdatePhase(np, ToK8sTime(p.GetOccurredAt()), p.GetReason(), clearStateOnAnyTermination, p.GetErr()) } // Update TaskStatus if n.HasTaskNodeState() { From 625ab3d93679e47bcc4a67a6f059e8a5e72e5ee3 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 13 Dec 2023 18:38:02 +0000 Subject: [PATCH 15/19] Add a comment Signed-off-by: Thomas Newton --- flytepropeller/pkg/controller/nodes/branch/evaluator.go | 1 + 1 file changed, 1 insertion(+) diff --git a/flytepropeller/pkg/controller/nodes/branch/evaluator.go b/flytepropeller/pkg/controller/nodes/branch/evaluator.go index c62ed828df..9d4aede5e9 100644 --- a/flytepropeller/pkg/controller/nodes/branch/evaluator.go +++ b/flytepropeller/pkg/controller/nodes/branch/evaluator.go @@ -129,6 +129,7 @@ func DecideBranch(ctx context.Context, nl executors.NodeLookup, nodeID v1alpha1. } nStatus := nl.GetNodeExecutionStatus(ctx, n.GetID()) logger.Infof(ctx, "Branch Setting Node[%v] status to Skipped!", skippedNodeID) + // TODO: What is the impact of always setting clearStateOnAnyTermination=false here? nStatus.UpdatePhase(v1alpha1.NodePhaseSkipped, v1.Now(), "Branch evaluated to false", false, nil) } From 42fb04933c68c614e4ce245619e4575e9c65eacf Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Thu, 14 Dec 2023 13:07:38 +0000 Subject: [PATCH 16/19] Missing test coverage - add test case `non-terminal-timing-out` Signed-off-by: Thomas Newton --- .../v1alpha1/node_status_test.go | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go index 268cad6330..63d8b056e7 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go @@ -300,7 +300,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { ns := NodeStatus{} p := NodePhaseRunning msg := "running" - ns.UpdatePhase(p, n, msg, false, nil) + ns.UpdatePhase(p, n, msg, clearStateOnAnyTermination, nil) assert.Equal(t, *ns.LastUpdatedAt, n) assert.Nil(t, ns.QueuedAt) @@ -312,11 +312,27 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { assert.Nil(t, ns.Error) }) + t.Run("non-terminal-timing-out", func(t *testing.T) { + ns := NodeStatus{} + p := NodePhaseTimingOut + msg := "timing-out" + ns.UpdatePhase(p, n, msg, clearStateOnAnyTermination, nil) + + assert.Equal(t, *ns.LastUpdatedAt, n) + assert.Nil(t, ns.QueuedAt) + assert.Nil(t, ns.LastAttemptStartedAt) + assert.Nil(t, ns.StartedAt) + assert.Nil(t, ns.StoppedAt) + assert.Equal(t, p, ns.Phase) + assert.Equal(t, msg, ns.Message) + assert.Nil(t, ns.Error) + }) + t.Run("terminal-success", func(t *testing.T) { ns := NodeStatus{} p := NodePhaseSucceeded msg := success - ns.UpdatePhase(p, n, msg, false, nil) + ns.UpdatePhase(p, n, msg, clearStateOnAnyTermination, nil) assert.Nil(t, ns.LastUpdatedAt) assert.Nil(t, ns.QueuedAt) @@ -332,7 +348,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { ns := NodeStatus{} p := NodePhaseSucceeded msg := success - ns.UpdatePhase(p, n, msg, false, nil) + ns.UpdatePhase(p, n, msg, clearStateOnAnyTermination, nil) assert.Nil(t, ns.LastUpdatedAt) assert.Nil(t, ns.QueuedAt) @@ -358,7 +374,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { } p := NodePhaseSucceeded msg := success - ns.UpdatePhase(p, n, msg, false, nil) + ns.UpdatePhase(p, n, msg, clearStateOnAnyTermination, nil) assert.Nil(t, ns.LastUpdatedAt) assert.Nil(t, ns.QueuedAt) @@ -390,7 +406,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { n2 := metav1.NewTime(time.Now()) p := NodePhaseRunning msg := "running" - ns.UpdatePhase(p, n2, msg, false, nil) + ns.UpdatePhase(p, n2, msg, clearStateOnAnyTermination, nil) assert.Equal(t, *ns.LastUpdatedAt, n2) assert.Equal(t, *ns.QueuedAt, n) From f3d9f7b8ab137de1c34f2f83b979bdc494eef28e Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Fri, 15 Dec 2023 14:39:58 +0000 Subject: [PATCH 17/19] Add test coverage for `TestNodeExecutor_RecursiveNodeHandler_Recurse` with `clearStateOnAnyTermination=true` Signed-off-by: Thomas Newton --- .../pkg/controller/nodes/executor_test.go | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/executor_test.go b/flytepropeller/pkg/controller/nodes/executor_test.go index 648f4e339b..70ab7fbc88 100644 --- a/flytepropeller/pkg/controller/nodes/executor_test.go +++ b/flytepropeller/pkg/controller/nodes/executor_test.go @@ -565,7 +565,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { }, } - setupNodePhase := func(n0Phase, n2Phase, expectedN2Phase v1alpha1.NodePhase) (*mocks.ExecutableWorkflow, *mocks.ExecutableNodeStatus) { + setupNodePhase := func(n0Phase, n2Phase, expectedN2Phase v1alpha1.NodePhase, expectedClearStateOnAnyTermination bool) (*mocks.ExecutableWorkflow, *mocks.ExecutableNodeStatus) { taskID := "id" taskID0 := "id1" // Setup @@ -582,8 +582,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { mockN2Status.OnGetStoppedAt().Return(nil) var ee *core.ExecutionError - // TODO: Add tests case with clearStateOnAnyTermination=true - mockN2Status.On("UpdatePhase", expectedN2Phase, mock.Anything, mock.AnythingOfType("string"), false, ee) + mockN2Status.On("UpdatePhase", expectedN2Phase, mock.Anything, mock.AnythingOfType("string"), expectedClearStateOnAnyTermination, ee) mockN2Status.OnIsDirty().Return(false) mockN2Status.OnGetTaskNodeStatus().Return(nil) mockN2Status.On("ClearDynamicNodeStatus").Return(nil) @@ -660,17 +659,21 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { } tests := []struct { - name string - currentNodePhase v1alpha1.NodePhase - parentNodePhase v1alpha1.NodePhase - expectedNodePhase v1alpha1.NodePhase - expectedPhase interfaces.NodePhase - expectedError bool - updateCalled bool + name string + currentNodePhase v1alpha1.NodePhase + parentNodePhase v1alpha1.NodePhase + clearStateOnAnyTermination bool + expectedNodePhase v1alpha1.NodePhase + expectedPhase interfaces.NodePhase + expectedError bool + updateCalled bool }{ - {"notYetStarted->skipped", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseFailed, v1alpha1.NodePhaseSkipped, interfaces.NodePhaseFailed, false, false}, - {"notYetStarted->skipped", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseSkipped, v1alpha1.NodePhaseSkipped, interfaces.NodePhaseSuccess, false, true}, - {"notYetStarted->queued", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseSucceeded, v1alpha1.NodePhaseQueued, interfaces.NodePhasePending, false, true}, + {"notYetStarted->skipped", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseFailed, false, v1alpha1.NodePhaseSkipped, interfaces.NodePhaseFailed, false, false}, + {"notYetStarted->skipped", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseSkipped, false, v1alpha1.NodePhaseSkipped, interfaces.NodePhaseSuccess, false, true}, + {"notYetStarted->queued", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseSucceeded, false, v1alpha1.NodePhaseQueued, interfaces.NodePhasePending, false, true}, + {"notYetStarted->skipped clearStateOnAnyTermination", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseFailed, true, v1alpha1.NodePhaseSkipped, interfaces.NodePhaseFailed, false, false}, + {"notYetStarted->skipped clearStateOnAnyTermination", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseSkipped, true, v1alpha1.NodePhaseSkipped, interfaces.NodePhaseSuccess, false, true}, + {"notYetStarted->queued clearStateOnAnyTermination", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseSucceeded, true, v1alpha1.NodePhaseQueued, interfaces.NodePhasePending, false, true}, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { @@ -685,12 +688,14 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { h.OnFinalizeRequired().Return(false) hf.OnGetHandler(v1alpha1.NodeKindTask).Return(h, nil) - mockWf, _ := setupNodePhase(test.parentNodePhase, test.currentNodePhase, test.expectedNodePhase) + mockWf, _ := setupNodePhase(test.parentNodePhase, test.currentNodePhase, test.expectedNodePhase, test.clearStateOnAnyTermination) startNode := mockWf.StartNode() store := createInmemoryDataStore(t, promutils.NewTestScope()) adminClient := launchplan.NewFailFastLaunchPlanExecutor() - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, + nodeConfig := config.GetConfig().NodeConfig + nodeConfig.ClearStateOnAnyTermination = test.clearStateOnAnyTermination + execIface, err := NewExecutor(ctx, nodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*recursiveNodeExecutor) From 453a69f6769b604f16e06ca80afb29a2ee388444 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Fri, 15 Dec 2023 14:45:25 +0000 Subject: [PATCH 18/19] Address another TODO Signed-off-by: Thomas Newton --- flytepropeller/pkg/controller/nodes/branch/evaluator.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flytepropeller/pkg/controller/nodes/branch/evaluator.go b/flytepropeller/pkg/controller/nodes/branch/evaluator.go index 9d4aede5e9..ee58570d97 100644 --- a/flytepropeller/pkg/controller/nodes/branch/evaluator.go +++ b/flytepropeller/pkg/controller/nodes/branch/evaluator.go @@ -129,7 +129,8 @@ func DecideBranch(ctx context.Context, nl executors.NodeLookup, nodeID v1alpha1. } nStatus := nl.GetNodeExecutionStatus(ctx, n.GetID()) logger.Infof(ctx, "Branch Setting Node[%v] status to Skipped!", skippedNodeID) - // TODO: What is the impact of always setting clearStateOnAnyTermination=false here? + // We hard code clearStateOnAnyTermination=false because it has no effect when setting phase to + // NodePhaseSkipped. This saves us passing the config all the way down from the nodeExecutor nStatus.UpdatePhase(v1alpha1.NodePhaseSkipped, v1.Now(), "Branch evaluated to false", false, nil) } From fb9bd90a8143e18bcfdbb800ca27a291ed008ba1 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Fri, 15 Dec 2023 14:55:28 +0000 Subject: [PATCH 19/19] Add comment Signed-off-by: Thomas Newton --- flytepropeller/pkg/controller/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index 9dce351b59..dc4d135e73 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -212,7 +212,7 @@ type NodeConfig struct { InterruptibleFailureThreshold int32 `json:"interruptible-failure-threshold" pflag:"1,number of failures for a node to be still considered interruptible. Negative numbers are treated as complementary (ex. -1 means last attempt is non-interruptible).'"` DefaultMaxAttempts int32 `json:"default-max-attempts" pflag:"3,Default maximum number of attempts for a node"` IgnoreRetryCause bool `json:"ignore-retry-cause" pflag:",Ignore retry cause and count all attempts toward a node's max attempts"` - ClearStateOnAnyTermination bool `json:"clear-state-on-any-termination" pflag:",Collapse node on any terminal state"` + ClearStateOnAnyTermination bool `json:"clear-state-on-any-termination" pflag:",Collapse node on any terminal state, not just successful terminations. This is useful to reduce the size of workflow state in etcd."` } // DefaultDeadlines contains default values for timeouts