Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tracking for active node and task execution counts in propeller #4986

Merged
merged 5 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@
workflowStore workflowstore.FlyteWorkflow
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder
metrics *metrics
leaderElector *leaderelection.LeaderElector
levelMonitor *ResourceLevelMonitor
recorder record.EventRecorder
metrics *metrics
leaderElector *leaderelection.LeaderElector
levelMonitor *ResourceLevelMonitor
executionStats *workflowstore.ExecutionStatsMonitor
}

// Run either as a leader -if configured- or as a standalone process.
Expand Down Expand Up @@ -117,6 +118,7 @@

// Start the collector process
c.levelMonitor.RunCollector(ctx)
c.executionStats.RunStatsMonitor(ctx)

Check warning on line 121 in flytepropeller/pkg/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/controller.go#L121

Added line #L121 was not covered by tests

// Start the informer factories to begin populating the informer caches
logger.Info(ctx, "Starting FlyteWorkflow controller")
Expand Down Expand Up @@ -329,7 +331,6 @@
if sCfg == nil {
logger.Errorf(ctx, "Storage configuration missing.")
}

store, err := storage.NewDataStore(sCfg, scope.NewSubScope("metastore"))
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Metadata storage")
Expand Down Expand Up @@ -445,7 +446,13 @@
return nil, errors.Wrapf(err, "Failed to create Controller.")
}

workflowExecutor, err := workflow.NewExecutor(ctx, store, controller.enqueueWorkflowForNodeUpdates, eventSink, controller.recorder, cfg.MetadataPrefix, nodeExecutor, &cfg.EventConfig, cfg.ClusterID, scope)
activeExecutions, err := workflowstore.NewExecutionStatsHolder()
if err != nil {
return nil, err
}
controller.executionStats = workflowstore.NewExecutionStatsMonitor(scope.NewSubScope("execstats"), flyteworkflowInformer.Lister(), activeExecutions)

workflowExecutor, err := workflow.NewExecutor(ctx, store, controller.enqueueWorkflowForNodeUpdates, eventSink, controller.recorder, cfg.MetadataPrefix, nodeExecutor, &cfg.EventConfig, cfg.ClusterID, scope, activeExecutions)

Check warning on line 455 in flytepropeller/pkg/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/controller.go#L449-L455

Added lines #L449 - L455 were not covered by tests
if err != nil {
return nil, err
}
Expand Down
36 changes: 31 additions & 5 deletions flytepropeller/pkg/controller/executors/execution_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
type ControlFlow interface {
CurrentParallelism() uint32
IncrementParallelism() uint32
CurrentNodeExecutionCount() uint32
IncrementNodeExecutionCount() uint32
CurrentTaskExecutionCount() uint32
IncrementTaskExecutionCount() uint32
}

type ExecutionContext interface {
Expand Down Expand Up @@ -71,16 +75,36 @@
type controlFlow struct {
// We could use atomic.Uint32, but this is not required for current Propeller. As every round is run in a single
// thread and using atomic will introduce memory barriers
v uint32
parallelism uint32
nodeExecutionCount uint32
taskExecutionCount uint32
}

func (c *controlFlow) CurrentParallelism() uint32 {
return c.v
return c.parallelism
}

func (c *controlFlow) IncrementParallelism() uint32 {
c.v = c.v + 1
return c.v
c.parallelism = c.parallelism + 1
return c.parallelism
}

func (c *controlFlow) CurrentNodeExecutionCount() uint32 {
return c.nodeExecutionCount

Check warning on line 93 in flytepropeller/pkg/controller/executors/execution_context.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/executors/execution_context.go#L92-L93

Added lines #L92 - L93 were not covered by tests
}

func (c *controlFlow) IncrementNodeExecutionCount() uint32 {
c.nodeExecutionCount++
return c.nodeExecutionCount

Check warning on line 98 in flytepropeller/pkg/controller/executors/execution_context.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/executors/execution_context.go#L96-L98

Added lines #L96 - L98 were not covered by tests
}

func (c *controlFlow) CurrentTaskExecutionCount() uint32 {
return c.taskExecutionCount

Check warning on line 102 in flytepropeller/pkg/controller/executors/execution_context.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/executors/execution_context.go#L101-L102

Added lines #L101 - L102 were not covered by tests
}

func (c *controlFlow) IncrementTaskExecutionCount() uint32 {
c.taskExecutionCount++
return c.taskExecutionCount

Check warning on line 107 in flytepropeller/pkg/controller/executors/execution_context.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/executors/execution_context.go#L105-L107

Added lines #L105 - L107 were not covered by tests
}

func NewExecutionContextWithTasksGetter(prevExecContext ExecutionContext, taskGetter TaskDetailsGetter) ExecutionContext {
Expand Down Expand Up @@ -114,6 +138,8 @@

func InitializeControlFlow() ControlFlow {
return &controlFlow{
v: 0,
parallelism: 0,
nodeExecutionCount: 0,
taskExecutionCount: 0,
}
}
128 changes: 128 additions & 0 deletions flytepropeller/pkg/controller/executors/mocks/execution_context.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions flytepropeller/pkg/controller/nodes/array/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte
},
nil,
)
executionContext.OnIncrementNodeExecutionCount().Return(1)
executionContext.OnIncrementTaskExecutionCount().Return(1)
executionContext.OnCurrentNodeExecutionCount().Return(1)
executionContext.OnCurrentTaskExecutionCount().Return(1)
nCtx.OnExecutionContext().Return(executionContext)

// EventsRecorder
Expand Down
10 changes: 10 additions & 0 deletions flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,15 @@ func (c *recursiveNodeExecutor) RecursiveNodeHandler(ctx context.Context, execCo
nodeStatus := nl.GetNodeExecutionStatus(ctx, currentNode.GetID())
nodePhase := nodeStatus.GetPhase()

if nodePhase == v1alpha1.NodePhaseRunning && execContext != nil {
execContext.IncrementNodeExecutionCount()
if currentNode.GetKind() == v1alpha1.NodeKindTask {
execContext.IncrementTaskExecutionCount()
}
logger.Debugf(currentNodeCtx, "recursive handler - node execution count [%v], task execution count [%v], phase [%v], ",
execContext.CurrentNodeExecutionCount(), execContext.CurrentTaskExecutionCount(), nodePhase.String())
}

if canHandleNode(nodePhase) {
// TODO Follow up Pull Request,
// 1. Rename this method to DAGTraversalHandleNode (accepts a DAGStructure along-with) the remaining arguments
Expand Down Expand Up @@ -287,6 +296,7 @@ func (c *recursiveNodeExecutor) handleDownstream(ctx context.Context, execContex
}), nil
}

logger.Debugf(ctx, "downstream handler starting node id %v, ", downstreamNode.GetID())
state, err := c.RecursiveNodeHandler(ctx, execContext, dag, nl, downstreamNode)
if err != nil {
return interfaces.NodeStatusUndefined, err
Expand Down
Loading
Loading