Skip to content

Commit

Permalink
Rename the config option to enable-cr-debug-metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Newton <[email protected]>
  • Loading branch information
Tom-Newton committed Jan 12, 2024
1 parent fb9bd90 commit 64d7fa2
Show file tree
Hide file tree
Showing 12 changed files with 57 additions and 57 deletions.
2 changes: 1 addition & 1 deletion flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ type MutableNodeStatus interface {
SetOutputDir(d DataReference)
SetParentNodeID(n *NodeID)
SetParentTaskID(t *core.TaskExecutionIdentifier)
UpdatePhase(phase NodePhase, occurredAt metav1.Time, reason string, clearStateOnAnyTermination bool, err *core.ExecutionError)
UpdatePhase(phase NodePhase, occurredAt metav1.Time, reason string, enableCRDebugMetadata bool, err *core.ExecutionError)
IncrementAttempts() uint32
IncrementSystemFailures() uint32
SetCached()
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ func (in *NodeStatus) GetOrCreateArrayNodeStatus() MutableArrayNodeStatus {
return in.ArrayNodeStatus
}

func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason string, clearStateOnAnyTermination bool, err *core.ExecutionError) {
func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason string, enableCRDebugMetadata bool, err *core.ExecutionError) {
if in.Phase == p && in.Message == reason {
// We will not update the phase multiple times. This prevents the comparison from returning false positive
return
Expand Down Expand Up @@ -629,8 +629,8 @@ func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason st
if in.StoppedAt == nil {
in.StoppedAt = &n
}
if p == NodePhaseSucceeded || p == NodePhaseSkipped || clearStateOnAnyTermination {
// Clear most status related fields after reaching a terminal state. This keeps the CRD state small to avoid
if p == NodePhaseSucceeded || p == NodePhaseSkipped || !enableCRDebugMetadata {
// Clear most status related fields after reaching a terminal state. This keeps the CR state small to avoid
// etcd size limits. Importantly we keep Phase, StoppedAt and Error which will be needed further.
in.Message = ""
in.QueuedAt = nil
Expand Down
28 changes: 14 additions & 14 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,31 +260,31 @@ func TestNodeStatus_UpdatePhase(t *testing.T) {

const queued = "queued"
const success = "success"
for _, clearStateOnAnyTermination := range []bool{false, true} {
for _, enableCRDebugMetadata := range []bool{false, true} {
t.Run("identical-phase", func(t *testing.T) {
p := NodePhaseQueued
ns := NodeStatus{
Phase: p,
Message: queued,
}
msg := queued
ns.UpdatePhase(p, n, msg, clearStateOnAnyTermination, nil)
ns.UpdatePhase(p, n, msg, enableCRDebugMetadata, nil)
assert.Nil(t, ns.QueuedAt)
})

t.Run("zero", func(t *testing.T) {
p := NodePhaseQueued
ns := NodeStatus{}
msg := queued
ns.UpdatePhase(p, metav1.NewTime(time.Time{}), msg, clearStateOnAnyTermination, nil)
ns.UpdatePhase(p, metav1.NewTime(time.Time{}), msg, enableCRDebugMetadata, nil)
assert.NotNil(t, ns.QueuedAt)
})

t.Run("non-terminal", func(t *testing.T) {
ns := NodeStatus{}
p := NodePhaseQueued
msg := queued
ns.UpdatePhase(p, n, msg, clearStateOnAnyTermination, nil)
ns.UpdatePhase(p, n, msg, enableCRDebugMetadata, nil)

assert.Equal(t, *ns.LastUpdatedAt, n)
assert.Equal(t, *ns.QueuedAt, n)
Expand All @@ -300,7 +300,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) {
ns := NodeStatus{}
p := NodePhaseRunning
msg := "running"
ns.UpdatePhase(p, n, msg, clearStateOnAnyTermination, nil)
ns.UpdatePhase(p, n, msg, enableCRDebugMetadata, nil)

assert.Equal(t, *ns.LastUpdatedAt, n)
assert.Nil(t, ns.QueuedAt)
Expand All @@ -316,7 +316,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) {
ns := NodeStatus{}
p := NodePhaseTimingOut
msg := "timing-out"
ns.UpdatePhase(p, n, msg, clearStateOnAnyTermination, nil)
ns.UpdatePhase(p, n, msg, enableCRDebugMetadata, nil)

assert.Equal(t, *ns.LastUpdatedAt, n)
assert.Nil(t, ns.QueuedAt)
Expand All @@ -332,7 +332,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) {
ns := NodeStatus{}
p := NodePhaseSucceeded
msg := success
ns.UpdatePhase(p, n, msg, clearStateOnAnyTermination, nil)
ns.UpdatePhase(p, n, msg, enableCRDebugMetadata, nil)

assert.Nil(t, ns.LastUpdatedAt)
assert.Nil(t, ns.QueuedAt)
Expand All @@ -348,7 +348,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) {
ns := NodeStatus{}
p := NodePhaseSucceeded
msg := success
ns.UpdatePhase(p, n, msg, clearStateOnAnyTermination, nil)
ns.UpdatePhase(p, n, msg, enableCRDebugMetadata, nil)

assert.Nil(t, ns.LastUpdatedAt)
assert.Nil(t, ns.QueuedAt)
Expand All @@ -374,7 +374,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) {
}
p := NodePhaseSucceeded
msg := success
ns.UpdatePhase(p, n, msg, clearStateOnAnyTermination, nil)
ns.UpdatePhase(p, n, msg, enableCRDebugMetadata, nil)

assert.Nil(t, ns.LastUpdatedAt)
assert.Nil(t, ns.QueuedAt)
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) {
n2 := metav1.NewTime(time.Now())
p := NodePhaseRunning
msg := "running"
ns.UpdatePhase(p, n2, msg, clearStateOnAnyTermination, nil)
ns.UpdatePhase(p, n2, msg, enableCRDebugMetadata, nil)

assert.Equal(t, *ns.LastUpdatedAt, n2)
assert.Equal(t, *ns.QueuedAt, n)
Expand All @@ -429,7 +429,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) {
p := NodePhaseFailed
msg := "failed"
err := &core.ExecutionError{}
ns.UpdatePhase(p, n, msg, false, err)
ns.UpdatePhase(p, n, msg, true, err)

assert.Equal(t, *ns.LastUpdatedAt, n)
assert.Nil(t, ns.QueuedAt)
Expand All @@ -446,7 +446,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) {
p := NodePhaseFailed
msg := "failed"
err := &core.ExecutionError{}
ns.UpdatePhase(p, n, msg, true, err)
ns.UpdatePhase(p, n, msg, false, err)

assert.Nil(t, ns.LastUpdatedAt)
assert.Nil(t, ns.QueuedAt)
Expand All @@ -463,7 +463,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) {
p := NodePhaseTimedOut
msg := "tm"
err := &core.ExecutionError{}
ns.UpdatePhase(p, n, msg, false, err)
ns.UpdatePhase(p, n, msg, true, err)

assert.Equal(t, *ns.LastUpdatedAt, n)
assert.Nil(t, ns.QueuedAt)
Expand All @@ -480,7 +480,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) {
p := NodePhaseTimedOut
msg := "tm"
err := &core.ExecutionError{}
ns.UpdatePhase(p, n, msg, true, err)
ns.UpdatePhase(p, n, msg, false, err)

assert.Nil(t, ns.LastUpdatedAt)
assert.Nil(t, ns.QueuedAt)
Expand Down
4 changes: 2 additions & 2 deletions flytepropeller/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ var (
InterruptibleFailureThreshold: -1,
DefaultMaxAttempts: 1,
IgnoreRetryCause: false,
ClearStateOnAnyTermination: false,
EnableCRDebugMetadata: false,
},
MaxStreakLength: 8, // Turbo mode is enabled by default
ProfilerPort: config.Port{
Expand Down Expand Up @@ -212,7 +212,7 @@ type NodeConfig struct {
InterruptibleFailureThreshold int32 `json:"interruptible-failure-threshold" pflag:"1,number of failures for a node to be still considered interruptible. Negative numbers are treated as complementary (ex. -1 means last attempt is non-interruptible).'"`
DefaultMaxAttempts int32 `json:"default-max-attempts" pflag:"3,Default maximum number of attempts for a node"`
IgnoreRetryCause bool `json:"ignore-retry-cause" pflag:",Ignore retry cause and count all attempts toward a node's max attempts"`
ClearStateOnAnyTermination bool `json:"clear-state-on-any-termination" pflag:",Collapse node on any terminal state, not just successful terminations. This is useful to reduce the size of workflow state in etcd."`
EnableCRDebugMetadata bool `json:"enable-cr-debug-metadata" pflag:",Collapse node on any terminal state, not just successful terminations. This is useful to reduce the size of workflow state in etcd."`
}

// DefaultDeadlines contains default values for timeouts
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions flytepropeller/pkg/controller/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions flytepropeller/pkg/controller/nodes/branch/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ func DecideBranch(ctx context.Context, nl executors.NodeLookup, nodeID v1alpha1.
}
nStatus := nl.GetNodeExecutionStatus(ctx, n.GetID())
logger.Infof(ctx, "Branch Setting Node[%v] status to Skipped!", skippedNodeID)
// We hard code clearStateOnAnyTermination=false because it has no effect when setting phase to
// NodePhaseSkipped. This saves us passing the config all the way down from the nodeExecutor
nStatus.UpdatePhase(v1alpha1.NodePhaseSkipped, v1.Now(), "Branch evaluated to false", false, nil)
// We hard code enableCRDebugMetadata=true because it has no effect when setting phase to
// NodePhaseSkipped. This saves us passing the config all the way down from the nodeExecutor.
nStatus.UpdatePhase(v1alpha1.NodePhaseSkipped, v1.Now(), "Branch evaluated to false", true, nil)
}

if selectedNodeID == nil {
Expand Down
16 changes: 8 additions & 8 deletions flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func (c *recursiveNodeExecutor) WithNodeExecutionContextBuilder(nCtxBuilder inte
type nodeExecutor struct {
catalog catalog.Client
clusterID string
clearStateOnAnyTermination bool
enableCRDebugMetadata bool
defaultActiveDeadline time.Duration
defaultDataSandbox storage.DataReference
defaultExecutionDeadline time.Duration
Expand Down Expand Up @@ -1006,7 +1006,7 @@ func (c *nodeExecutor) handleNotYetStartedNode(ctx context.Context, dag executor
logger.Warningf(ctx, "Failed to record nodeEvent, error [%s]", err.Error())
return interfaces.NodeStatusUndefined, errors.Wrapf(errors.EventRecordingFailed, nCtx.NodeID(), err, "failed to record node event")
}
UpdateNodeStatus(np, p, nCtx.NodeStateReader(), nodeStatus, c.clearStateOnAnyTermination)
UpdateNodeStatus(np, p, nCtx.NodeStateReader(), nodeStatus, c.enableCRDebugMetadata)
c.RecordTransitionLatency(ctx, dag, nCtx.ContextualNodeLookup(), nCtx.Node(), nodeStatus)
}

Expand Down Expand Up @@ -1272,7 +1272,7 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx inter
}
}

UpdateNodeStatus(np, p, nCtx.NodeStateReader(), nodeStatus, c.clearStateOnAnyTermination)
UpdateNodeStatus(np, p, nCtx.NodeStateReader(), nodeStatus, c.enableCRDebugMetadata)
return finalStatus, nil
}

Expand All @@ -1286,7 +1286,7 @@ func (c *nodeExecutor) handleRetryableFailure(ctx context.Context, nCtx interfac
// NOTE: It is important to increment attempts only after abort has been called. Increment attempt mutates the state
// Attempt is used throughout the system to determine the idempotent resource version.
nodeStatus.IncrementAttempts()
nodeStatus.UpdatePhase(v1alpha1.NodePhaseRunning, metav1.Now(), "retrying", c.clearStateOnAnyTermination, nil)
nodeStatus.UpdatePhase(v1alpha1.NodePhaseRunning, metav1.Now(), "retrying", c.enableCRDebugMetadata, nil)
// We are going to retry in the next round, so we should clear all current state
nodeStatus.ClearSubNodeStatus()
nodeStatus.ClearTaskStatus()
Expand Down Expand Up @@ -1331,7 +1331,7 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur
if startedAt == nil {
startedAt = &t
}
nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, t, nodeStatus.GetMessage(), c.clearStateOnAnyTermination, nodeStatus.GetExecutionError())
nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, t, nodeStatus.GetMessage(), c.enableCRDebugMetadata, nodeStatus.GetExecutionError())
c.metrics.FailureDuration.Observe(ctx, startedAt.Time, nodeStatus.GetStoppedAt().Time)
if nCtx.NodeExecutionMetadata().IsInterruptible() {
c.metrics.InterruptibleNodesTerminated.Inc(ctx)
Expand All @@ -1345,7 +1345,7 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur
return interfaces.NodeStatusUndefined, err
}

nodeStatus.UpdatePhase(v1alpha1.NodePhaseTimedOut, metav1.Now(), nodeStatus.GetMessage(), c.clearStateOnAnyTermination, nodeStatus.GetExecutionError())
nodeStatus.UpdatePhase(v1alpha1.NodePhaseTimedOut, metav1.Now(), nodeStatus.GetMessage(), c.enableCRDebugMetadata, nodeStatus.GetExecutionError())
c.metrics.TimedOutFailure.Inc(ctx)
if nCtx.NodeExecutionMetadata().IsInterruptible() {
c.metrics.InterruptibleNodesTerminated.Inc(ctx)
Expand All @@ -1369,7 +1369,7 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur
stopped = &t
}
c.metrics.SuccessDuration.Observe(ctx, started.Time, stopped.Time)
nodeStatus.UpdatePhase(v1alpha1.NodePhaseSucceeded, t, "completed successfully", c.clearStateOnAnyTermination, nil)
nodeStatus.UpdatePhase(v1alpha1.NodePhaseSucceeded, t, "completed successfully", c.enableCRDebugMetadata, nil)
if nCtx.NodeExecutionMetadata().IsInterruptible() {
c.metrics.InterruptibleNodesTerminated.Inc(ctx)
}
Expand Down Expand Up @@ -1436,7 +1436,7 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora
nodeExecutor := &nodeExecutor{
catalog: catalogClient,
clusterID: clusterID,
clearStateOnAnyTermination: nodeConfig.ClearStateOnAnyTermination,
enableCRDebugMetadata: nodeConfig.EnableCRDebugMetadata,
defaultActiveDeadline: nodeConfig.DefaultDeadlines.DefaultNodeActiveDeadline.Duration,
defaultDataSandbox: defaultRawOutputPrefix,
defaultExecutionDeadline: nodeConfig.DefaultDeadlines.DefaultNodeExecutionDeadline.Duration,
Expand Down
Loading

0 comments on commit 64d7fa2

Please sign in to comment.