Skip to content

Commit

Permalink
Add tracking for active node and task execution counts in propeller
Browse files Browse the repository at this point in the history
  • Loading branch information
sshardool committed Feb 29, 2024
1 parent e1d5a7a commit 8a948ce
Show file tree
Hide file tree
Showing 6 changed files with 346 additions and 36 deletions.
19 changes: 13 additions & 6 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ type Controller struct {
workflowStore workflowstore.FlyteWorkflow
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder
metrics *metrics
leaderElector *leaderelection.LeaderElector
levelMonitor *ResourceLevelMonitor
recorder record.EventRecorder
metrics *metrics
leaderElector *leaderelection.LeaderElector
levelMonitor *ResourceLevelMonitor
executionStats *workflowstore.ExecutionStatsMonitor
}

// Run either as a leader -if configured- or as a standalone process.
Expand Down Expand Up @@ -117,6 +118,7 @@ func (c *Controller) run(ctx context.Context) error {

// Start the collector process
c.levelMonitor.RunCollector(ctx)
c.executionStats.RunStatsMonitor(ctx)

// Start the informer factories to begin populating the informer caches
logger.Info(ctx, "Starting FlyteWorkflow controller")
Expand Down Expand Up @@ -329,7 +331,6 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter
if sCfg == nil {
logger.Errorf(ctx, "Storage configuration missing.")
}

store, err := storage.NewDataStore(sCfg, scope.NewSubScope("metastore"))
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Metadata storage")
Expand Down Expand Up @@ -445,7 +446,13 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter
return nil, errors.Wrapf(err, "Failed to create Controller.")
}

workflowExecutor, err := workflow.NewExecutor(ctx, store, controller.enqueueWorkflowForNodeUpdates, eventSink, controller.recorder, cfg.MetadataPrefix, nodeExecutor, &cfg.EventConfig, cfg.ClusterID, scope)
activeExecutions, err := workflowstore.NewExecutionStatsHolder()
if err != nil {
return nil, err
}
controller.executionStats = workflowstore.NewExecutionStatsMonitor(scope.NewSubScope("execstats"), flyteworkflowInformer.Lister(), activeExecutions)

workflowExecutor, err := workflow.NewExecutor(ctx, store, controller.enqueueWorkflowForNodeUpdates, eventSink, controller.recorder, cfg.MetadataPrefix, nodeExecutor, &cfg.EventConfig, cfg.ClusterID, scope, activeExecutions)
if err != nil {
return nil, err
}
Expand Down
36 changes: 31 additions & 5 deletions flytepropeller/pkg/controller/executors/execution_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ type ImmutableParentInfo interface {
type ControlFlow interface {
CurrentParallelism() uint32
IncrementParallelism() uint32
CurrentNodeExecutionCount() uint32
IncrementNodeExecutionCount() uint32
CurrentTaskExecutionCount() uint32
IncrementTaskExecutionCount() uint32
}

type ExecutionContext interface {
Expand Down Expand Up @@ -71,16 +75,36 @@ func (p *parentExecutionInfo) CurrentAttempt() uint32 {
type controlFlow struct {
// We could use atomic.Uint32, but this is not required for current Propeller. As every round is run in a single
// thread and using atomic will introduce memory barriers
v uint32
parallelism uint32
nodeExecutionCount uint32
taskExecutionCount uint32
}

func (c *controlFlow) CurrentParallelism() uint32 {
return c.v
return c.parallelism
}

func (c *controlFlow) IncrementParallelism() uint32 {
c.v = c.v + 1
return c.v
c.parallelism = c.parallelism + 1
return c.parallelism
}

func (c *controlFlow) CurrentNodeExecutionCount() uint32 {
return c.nodeExecutionCount
}

func (c *controlFlow) IncrementNodeExecutionCount() uint32 {
c.nodeExecutionCount++
return c.nodeExecutionCount
}

func (c *controlFlow) CurrentTaskExecutionCount() uint32 {
return c.taskExecutionCount
}

func (c *controlFlow) IncrementTaskExecutionCount() uint32 {
c.taskExecutionCount++
return c.taskExecutionCount
}

func NewExecutionContextWithTasksGetter(prevExecContext ExecutionContext, taskGetter TaskDetailsGetter) ExecutionContext {
Expand Down Expand Up @@ -114,6 +138,8 @@ func NewParentInfo(uniqueID string, currentAttempts uint32) ImmutableParentInfo

func InitializeControlFlow() ControlFlow {
return &controlFlow{
v: 0,
parallelism: 0,
nodeExecutionCount: 0,
taskExecutionCount: 0,
}
}
10 changes: 10 additions & 0 deletions flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,15 @@ func (c *recursiveNodeExecutor) RecursiveNodeHandler(ctx context.Context, execCo
nodeStatus := nl.GetNodeExecutionStatus(ctx, currentNode.GetID())
nodePhase := nodeStatus.GetPhase()

if nodePhase == v1alpha1.NodePhaseRunning {
execContext.IncrementNodeExecutionCount()
if currentNode.GetKind() == v1alpha1.NodeKindTask {
execContext.IncrementTaskExecutionCount()
}
logger.Debugf(currentNodeCtx, "recursive handler - node execution count [%v], task execution count [%v], phase [%v], ",
execContext.CurrentNodeExecutionCount(), execContext.CurrentTaskExecutionCount(), nodePhase.String())
}

if canHandleNode(nodePhase) {
// TODO Follow up Pull Request,
// 1. Rename this method to DAGTraversalHandleNode (accepts a DAGStructure along-with) the remaining arguments
Expand Down Expand Up @@ -287,6 +296,7 @@ func (c *recursiveNodeExecutor) handleDownstream(ctx context.Context, execContex
}), nil
}

logger.Infof(ctx, "F3 starting node id %v, ", downstreamNode.GetID())
state, err := c.RecursiveNodeHandler(ctx, execContext, dag, nl, downstreamNode)
if err != nil {
return interfaces.NodeStatusUndefined, err
Expand Down
74 changes: 49 additions & 25 deletions flytepropeller/pkg/controller/workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/workflow/errors"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/workflowstore"
"github.com/flyteorg/flyte/flytepropeller/pkg/utils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
Expand Down Expand Up @@ -59,15 +60,16 @@ func StatusFailed(err *core.ExecutionError) Status {
}

type workflowExecutor struct {
enqueueWorkflow v1alpha1.EnqueueWorkflow
store *storage.DataStore
wfRecorder events.WorkflowEventRecorder
k8sRecorder record.EventRecorder
metadataPrefix storage.DataReference
nodeExecutor interfaces.Node
metrics *workflowMetrics
eventConfig *config.EventConfig
clusterID string
enqueueWorkflow v1alpha1.EnqueueWorkflow
store *storage.DataStore
wfRecorder events.WorkflowEventRecorder
k8sRecorder record.EventRecorder
metadataPrefix storage.DataReference
nodeExecutor interfaces.Node
metrics *workflowMetrics
eventConfig *config.EventConfig
clusterID string
activeExecutions *workflowstore.ExecutionStatsHolder
}

func (c *workflowExecutor) constructWorkflowMetadataPrefix(ctx context.Context, w *v1alpha1.FlyteWorkflow) (storage.DataReference, error) {
Expand Down Expand Up @@ -144,10 +146,20 @@ func (c *workflowExecutor) handleRunningWorkflow(ctx context.Context, w *v1alpha
Message: "Start node not found"}), nil
}
execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow())
state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, w, startNode)
state, handlerErr := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, w, startNode)

execStats := workflowstore.SingleExecutionStats{
ActiveNodeCount: execcontext.CurrentNodeExecutionCount(),
ActiveTaskCount: execcontext.CurrentTaskExecutionCount()}
logger.Debugf(ctx, "running workflow node execution count [%v], task execution count [%v], execution-id [%v], ",
execStats.ActiveNodeCount, execStats.ActiveTaskCount, execcontext.GetExecutionID())
statErr := c.activeExecutions.AddOrUpdateEntry(execcontext.GetExecutionID().String(), execStats)
if statErr != nil {
logger.Errorf(ctx, "error updating active executions stats: %v", handlerErr)
}

if err != nil {
return StatusRunning, err
if handlerErr != nil {
return StatusRunning, handlerErr
}
if state.HasFailed() {
logger.Infof(ctx, "Workflow has failed. Error [%s]", state.Err.String())
Expand Down Expand Up @@ -175,9 +187,20 @@ func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.Fl

failureNodeStatus := w.GetExecutionStatus().GetNodeExecutionStatus(ctx, failureNode.GetID())
failureNodeLookup := executors.NewFailureNodeLookup(w, failureNode, failureNodeStatus)
state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, failureNodeLookup, failureNodeLookup, failureNode)
if err != nil {
return StatusFailureNode(execErr), err
state, handlerErr := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, failureNodeLookup, failureNodeLookup, failureNode)

execStats := workflowstore.SingleExecutionStats{
ActiveNodeCount: execcontext.CurrentNodeExecutionCount(),
ActiveTaskCount: execcontext.CurrentTaskExecutionCount()}
logger.Debugf(ctx, "failure node execution count [%v], task execution count [%v], execution-id [%v], ",
execStats.ActiveNodeCount, execStats.ActiveTaskCount, execcontext.GetExecutionID())
statErr := c.activeExecutions.AddOrUpdateEntry(execcontext.GetID(), execStats)
if statErr != nil {
logger.Errorf(ctx, "error updating active executions stats: %v", handlerErr)
}

if handlerErr != nil {
return StatusFailureNode(execErr), handlerErr
}

switch state.NodePhase {
Expand Down Expand Up @@ -504,7 +527,7 @@ func (c *workflowExecutor) cleanupRunningNodes(ctx context.Context, w v1alpha1.E

func NewExecutor(ctx context.Context, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink,
k8sEventRecorder record.EventRecorder, metadataPrefix string, nodeExecutor interfaces.Node, eventConfig *config.EventConfig,
clusterID string, scope promutils.Scope) (executors.Workflow, error) {
clusterID string, scope promutils.Scope, activeExecutions *workflowstore.ExecutionStatsHolder) (executors.Workflow, error) {
basePrefix := store.GetBaseContainerFQN(ctx)
if metadataPrefix != "" {
var err error
Expand All @@ -518,15 +541,16 @@ func NewExecutor(ctx context.Context, store *storage.DataStore, enQWorkflow v1al
workflowScope := scope.NewSubScope("workflow")

return &workflowExecutor{
nodeExecutor: nodeExecutor,
store: store,
enqueueWorkflow: enQWorkflow,
wfRecorder: events.NewWorkflowEventRecorder(eventSink, workflowScope, store),
k8sRecorder: k8sEventRecorder,
metadataPrefix: basePrefix,
metrics: newMetrics(workflowScope),
eventConfig: eventConfig,
clusterID: clusterID,
nodeExecutor: nodeExecutor,
store: store,
enqueueWorkflow: enQWorkflow,
wfRecorder: events.NewWorkflowEventRecorder(eventSink, workflowScope, store),
k8sRecorder: k8sEventRecorder,
metadataPrefix: basePrefix,
metrics: newMetrics(workflowScope),
eventConfig: eventConfig,
clusterID: clusterID,
activeExecutions: activeExecutions,
}, nil
}

Expand Down
Loading

0 comments on commit 8a948ce

Please sign in to comment.