diff --git a/flytepropeller/pkg/controller/controller.go b/flytepropeller/pkg/controller/controller.go index de28612c54b..3942492e852 100644 --- a/flytepropeller/pkg/controller/controller.go +++ b/flytepropeller/pkg/controller/controller.go @@ -82,10 +82,11 @@ type Controller struct { workflowStore workflowstore.FlyteWorkflow // recorder is an event recorder for recording Event resources to the // Kubernetes API. - recorder record.EventRecorder - metrics *metrics - leaderElector *leaderelection.LeaderElector - levelMonitor *ResourceLevelMonitor + recorder record.EventRecorder + metrics *metrics + leaderElector *leaderelection.LeaderElector + levelMonitor *ResourceLevelMonitor + executionStats *workflowstore.ExecutionStatsMonitor } // Run either as a leader -if configured- or as a standalone process. @@ -117,6 +118,7 @@ func (c *Controller) run(ctx context.Context) error { // Start the collector process c.levelMonitor.RunCollector(ctx) + c.executionStats.RunStatsMonitor(ctx) // Start the informer factories to begin populating the informer caches logger.Info(ctx, "Starting FlyteWorkflow controller") @@ -329,7 +331,6 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter if sCfg == nil { logger.Errorf(ctx, "Storage configuration missing.") } - store, err := storage.NewDataStore(sCfg, scope.NewSubScope("metastore")) if err != nil { return nil, errors.Wrapf(err, "Failed to create Metadata storage") @@ -445,7 +446,13 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter return nil, errors.Wrapf(err, "Failed to create Controller.") } - workflowExecutor, err := workflow.NewExecutor(ctx, store, controller.enqueueWorkflowForNodeUpdates, eventSink, controller.recorder, cfg.MetadataPrefix, nodeExecutor, &cfg.EventConfig, cfg.ClusterID, scope) + activeExecutions, err := workflowstore.NewExecutionStatsHolder() + if err != nil { + return nil, err + } + controller.executionStats = workflowstore.NewExecutionStatsMonitor(scope.NewSubScope("execstats"), flyteworkflowInformer.Lister(), activeExecutions) + + workflowExecutor, err := workflow.NewExecutor(ctx, store, controller.enqueueWorkflowForNodeUpdates, eventSink, controller.recorder, cfg.MetadataPrefix, nodeExecutor, &cfg.EventConfig, cfg.ClusterID, scope, activeExecutions) if err != nil { return nil, err } diff --git a/flytepropeller/pkg/controller/executors/execution_context.go b/flytepropeller/pkg/controller/executors/execution_context.go index ec182ab3cd1..84799a04004 100644 --- a/flytepropeller/pkg/controller/executors/execution_context.go +++ b/flytepropeller/pkg/controller/executors/execution_context.go @@ -33,6 +33,10 @@ type ImmutableParentInfo interface { type ControlFlow interface { CurrentParallelism() uint32 IncrementParallelism() uint32 + CurrentNodeExecutionCount() uint32 + IncrementNodeExecutionCount() uint32 + CurrentTaskExecutionCount() uint32 + IncrementTaskExecutionCount() uint32 } type ExecutionContext interface { @@ -71,16 +75,36 @@ func (p *parentExecutionInfo) CurrentAttempt() uint32 { type controlFlow struct { // We could use atomic.Uint32, but this is not required for current Propeller. As every round is run in a single // thread and using atomic will introduce memory barriers - v uint32 + parallelism uint32 + nodeExecutionCount uint32 + taskExecutionCount uint32 } func (c *controlFlow) CurrentParallelism() uint32 { - return c.v + return c.parallelism } func (c *controlFlow) IncrementParallelism() uint32 { - c.v = c.v + 1 - return c.v + c.parallelism = c.parallelism + 1 + return c.parallelism +} + +func (c *controlFlow) CurrentNodeExecutionCount() uint32 { + return c.nodeExecutionCount +} + +func (c *controlFlow) IncrementNodeExecutionCount() uint32 { + c.nodeExecutionCount++ + return c.nodeExecutionCount +} + +func (c *controlFlow) CurrentTaskExecutionCount() uint32 { + return c.taskExecutionCount +} + +func (c *controlFlow) IncrementTaskExecutionCount() uint32 { + c.taskExecutionCount++ + return c.taskExecutionCount } func NewExecutionContextWithTasksGetter(prevExecContext ExecutionContext, taskGetter TaskDetailsGetter) ExecutionContext { @@ -114,6 +138,8 @@ func NewParentInfo(uniqueID string, currentAttempts uint32) ImmutableParentInfo func InitializeControlFlow() ControlFlow { return &controlFlow{ - v: 0, + parallelism: 0, + nodeExecutionCount: 0, + taskExecutionCount: 0, } } diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index 0c98aeeb5fc..7c9884bf839 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -182,6 +182,15 @@ func (c *recursiveNodeExecutor) RecursiveNodeHandler(ctx context.Context, execCo nodeStatus := nl.GetNodeExecutionStatus(ctx, currentNode.GetID()) nodePhase := nodeStatus.GetPhase() + if nodePhase == v1alpha1.NodePhaseRunning { + execContext.IncrementNodeExecutionCount() + if currentNode.GetKind() == v1alpha1.NodeKindTask { + execContext.IncrementTaskExecutionCount() + } + logger.Debugf(currentNodeCtx, "recursive handler - node execution count [%v], task execution count [%v], phase [%v], ", + execContext.CurrentNodeExecutionCount(), execContext.CurrentTaskExecutionCount(), nodePhase.String()) + } + if canHandleNode(nodePhase) { // TODO Follow up Pull Request, // 1. Rename this method to DAGTraversalHandleNode (accepts a DAGStructure along-with) the remaining arguments @@ -287,6 +296,7 @@ func (c *recursiveNodeExecutor) handleDownstream(ctx context.Context, execContex }), nil } + logger.Infof(ctx, "F3 starting node id %v, ", downstreamNode.GetID()) state, err := c.RecursiveNodeHandler(ctx, execContext, dag, nl, downstreamNode) if err != nil { return interfaces.NodeStatusUndefined, err diff --git a/flytepropeller/pkg/controller/workflow/executor.go b/flytepropeller/pkg/controller/workflow/executor.go index 355681a7463..9ae3850640b 100644 --- a/flytepropeller/pkg/controller/workflow/executor.go +++ b/flytepropeller/pkg/controller/workflow/executor.go @@ -17,6 +17,7 @@ import ( "github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/workflow/errors" + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/workflowstore" "github.com/flyteorg/flyte/flytepropeller/pkg/utils" "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/flyteorg/flyte/flytestdlib/promutils" @@ -59,15 +60,16 @@ func StatusFailed(err *core.ExecutionError) Status { } type workflowExecutor struct { - enqueueWorkflow v1alpha1.EnqueueWorkflow - store *storage.DataStore - wfRecorder events.WorkflowEventRecorder - k8sRecorder record.EventRecorder - metadataPrefix storage.DataReference - nodeExecutor interfaces.Node - metrics *workflowMetrics - eventConfig *config.EventConfig - clusterID string + enqueueWorkflow v1alpha1.EnqueueWorkflow + store *storage.DataStore + wfRecorder events.WorkflowEventRecorder + k8sRecorder record.EventRecorder + metadataPrefix storage.DataReference + nodeExecutor interfaces.Node + metrics *workflowMetrics + eventConfig *config.EventConfig + clusterID string + activeExecutions *workflowstore.ExecutionStatsHolder } func (c *workflowExecutor) constructWorkflowMetadataPrefix(ctx context.Context, w *v1alpha1.FlyteWorkflow) (storage.DataReference, error) { @@ -144,10 +146,20 @@ func (c *workflowExecutor) handleRunningWorkflow(ctx context.Context, w *v1alpha Message: "Start node not found"}), nil } execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow()) - state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, w, startNode) + state, handlerErr := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, w, startNode) + + execStats := workflowstore.SingleExecutionStats{ + ActiveNodeCount: execcontext.CurrentNodeExecutionCount(), + ActiveTaskCount: execcontext.CurrentTaskExecutionCount()} + logger.Debugf(ctx, "running workflow node execution count [%v], task execution count [%v], execution-id [%v], ", + execStats.ActiveNodeCount, execStats.ActiveTaskCount, execcontext.GetExecutionID()) + statErr := c.activeExecutions.AddOrUpdateEntry(execcontext.GetExecutionID().String(), execStats) + if statErr != nil { + logger.Errorf(ctx, "error updating active executions stats: %v", handlerErr) + } - if err != nil { - return StatusRunning, err + if handlerErr != nil { + return StatusRunning, handlerErr } if state.HasFailed() { logger.Infof(ctx, "Workflow has failed. Error [%s]", state.Err.String()) @@ -175,9 +187,20 @@ func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.Fl failureNodeStatus := w.GetExecutionStatus().GetNodeExecutionStatus(ctx, failureNode.GetID()) failureNodeLookup := executors.NewFailureNodeLookup(w, failureNode, failureNodeStatus) - state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, failureNodeLookup, failureNodeLookup, failureNode) - if err != nil { - return StatusFailureNode(execErr), err + state, handlerErr := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, failureNodeLookup, failureNodeLookup, failureNode) + + execStats := workflowstore.SingleExecutionStats{ + ActiveNodeCount: execcontext.CurrentNodeExecutionCount(), + ActiveTaskCount: execcontext.CurrentTaskExecutionCount()} + logger.Debugf(ctx, "failure node execution count [%v], task execution count [%v], execution-id [%v], ", + execStats.ActiveNodeCount, execStats.ActiveTaskCount, execcontext.GetExecutionID()) + statErr := c.activeExecutions.AddOrUpdateEntry(execcontext.GetID(), execStats) + if statErr != nil { + logger.Errorf(ctx, "error updating active executions stats: %v", handlerErr) + } + + if handlerErr != nil { + return StatusFailureNode(execErr), handlerErr } switch state.NodePhase { @@ -504,7 +527,7 @@ func (c *workflowExecutor) cleanupRunningNodes(ctx context.Context, w v1alpha1.E func NewExecutor(ctx context.Context, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink, k8sEventRecorder record.EventRecorder, metadataPrefix string, nodeExecutor interfaces.Node, eventConfig *config.EventConfig, - clusterID string, scope promutils.Scope) (executors.Workflow, error) { + clusterID string, scope promutils.Scope, activeExecutions *workflowstore.ExecutionStatsHolder) (executors.Workflow, error) { basePrefix := store.GetBaseContainerFQN(ctx) if metadataPrefix != "" { var err error @@ -518,15 +541,16 @@ func NewExecutor(ctx context.Context, store *storage.DataStore, enQWorkflow v1al workflowScope := scope.NewSubScope("workflow") return &workflowExecutor{ - nodeExecutor: nodeExecutor, - store: store, - enqueueWorkflow: enQWorkflow, - wfRecorder: events.NewWorkflowEventRecorder(eventSink, workflowScope, store), - k8sRecorder: k8sEventRecorder, - metadataPrefix: basePrefix, - metrics: newMetrics(workflowScope), - eventConfig: eventConfig, - clusterID: clusterID, + nodeExecutor: nodeExecutor, + store: store, + enqueueWorkflow: enQWorkflow, + wfRecorder: events.NewWorkflowEventRecorder(eventSink, workflowScope, store), + k8sRecorder: k8sEventRecorder, + metadataPrefix: basePrefix, + metrics: newMetrics(workflowScope), + eventConfig: eventConfig, + clusterID: clusterID, + activeExecutions: activeExecutions, }, nil } diff --git a/flytepropeller/pkg/controller/workflowstore/execution_stats.go b/flytepropeller/pkg/controller/workflowstore/execution_stats.go new file mode 100644 index 00000000000..4bf44210b98 --- /dev/null +++ b/flytepropeller/pkg/controller/workflowstore/execution_stats.go @@ -0,0 +1,168 @@ +package workflowstore + +import ( + "context" + "fmt" + "runtime/pprof" + "sync" + "time" + + lister "github.com/flyteorg/flyte/flytepropeller/pkg/client/listers/flyteworkflow/v1alpha1" + "github.com/flyteorg/flyte/flytestdlib/contextutils" + "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/flyteorg/flyte/flytestdlib/promutils" + "github.com/prometheus/client_golang/prometheus" + "k8s.io/apimachinery/pkg/labels" +) + +const ( + resourceLevelExecutionStatsSyncDuration = 20 * time.Second +) + +type ExecutionStatsMonitor struct { + Scope promutils.Scope + + // Cached workflow list from the Informer + lister lister.FlyteWorkflowLister + + // Provides stats for the currently active executions + activeExecutions *ExecutionStatsHolder + + // These are currently aggregated values across all active workflows + ActiveNodeExecutions prometheus.Gauge + ActiveTaskExecutions prometheus.Gauge + ActiveWorkflowExecutions prometheus.Gauge +} + +func (e *ExecutionStatsMonitor) updateExecutionStats(ctx context.Context) { + // Convert the list of workflows to a map for faster lookups + // (todo) Update to include only the workflows in a running state + workflows, err := e.lister.List(labels.Everything()) + if err != nil { + logger.Errorf(ctx, "Failed to list workflows while removing terminated executions, %v", err) + return + } + workflowSet := make(map[string]bool) + for _, wf := range workflows { + workflowSet[wf.GetExecutionID().String()] = true + } + + err = e.activeExecutions.RemoveTerminatedExecutions(ctx, workflowSet) + if err != nil { + logger.Errorf(ctx, "Error while removing terminated executions from stats: %v ", err) + } +} + +func (e *ExecutionStatsMonitor) emitExecutionStats(ctx context.Context) { + executions, nodes, tasks, err := e.activeExecutions.AggregateActiveValues() + if err != nil { + logger.Errorf(ctx, "Error aggregating active execution stats: %v", err) + } + logger.Debugf(ctx, "Execution stats: ActiveExecutions: %d ActiveNodes: %d, ActiveTasks: %d", executions, nodes, tasks) + e.ActiveNodeExecutions.Set(float64(nodes)) + e.ActiveTaskExecutions.Set(float64(tasks)) + e.ActiveWorkflowExecutions.Set(float64(executions)) +} + +func (e *ExecutionStatsMonitor) RunStatsMonitor(ctx context.Context) { + ticker := time.NewTicker(resourceLevelExecutionStatsSyncDuration) + execStatsCtx := contextutils.WithGoroutineLabel(ctx, "execution-stats-monitor") + + go func() { + pprof.SetGoroutineLabels(execStatsCtx) + for { + select { + case <-execStatsCtx.Done(): + return + case <-ticker.C: + e.updateExecutionStats(ctx) + e.emitExecutionStats(ctx) + } + } + }() +} + +func NewExecutionStatsMonitor(scope promutils.Scope, lister lister.FlyteWorkflowLister, activeExecutions *ExecutionStatsHolder) *ExecutionStatsMonitor { + return &ExecutionStatsMonitor{ + Scope: scope, + lister: lister, + activeExecutions: activeExecutions, + ActiveNodeExecutions: scope.MustNewGauge("active_node_executions", "active node executions for propeller"), + ActiveTaskExecutions: scope.MustNewGauge("active_task_executions", "active task executions for propeller"), + ActiveWorkflowExecutions: scope.MustNewGauge("active_workflow_executions", "active workflow executions for propeller"), + } +} + +// SingleExecutionStats holds stats about a single workflow execution, such as active node and task counts. +type SingleExecutionStats struct { + ActiveNodeCount uint32 + ActiveTaskCount uint32 +} + +// ExecutionStatsHolder manages a map of execution IDs to their ExecutionStats. +type ExecutionStatsHolder struct { + mu sync.Mutex // Guards access to the map + executions map[string]SingleExecutionStats +} + +// NewExecutionStatsHolder creates a new ExecutionStatsHolder instance with an initialized map. +func NewExecutionStatsHolder() (*ExecutionStatsHolder, error) { + return &ExecutionStatsHolder{ + executions: make(map[string]SingleExecutionStats), + }, nil +} + +// AddOrUpdateEntry adds or updates an entry in the executions map. +func (esh *ExecutionStatsHolder) AddOrUpdateEntry(executionId string, executionStats SingleExecutionStats) error { + if esh == nil || esh.executions == nil { + return fmt.Errorf("ActiveExecutions is not initialized") + } + + esh.mu.Lock() + defer esh.mu.Unlock() + esh.executions[executionId] = executionStats + + return nil +} + +// Returns the aggregate of all active node and task counts in the map. +func (esh *ExecutionStatsHolder) AggregateActiveValues() (int, uint32, uint32, error) { + if esh == nil || esh.executions == nil { + return 0, 0, 0, fmt.Errorf("ActiveExecutions is not initialized") + } + + esh.mu.Lock() + defer esh.mu.Unlock() + + var sumNodes, sumTasks uint32 = 0, 0 + for _, stats := range esh.executions { + sumNodes += stats.ActiveNodeCount + sumTasks += stats.ActiveTaskCount + } + return len(esh.executions), sumNodes, sumTasks, nil +} + +func (esh *ExecutionStatsHolder) LogAllActiveExecutions(ctx context.Context) { + esh.mu.Lock() + defer esh.mu.Unlock() + + logger.Debugf(ctx, "Current Active Executions:") + for execID, stats := range esh.executions { + logger.Debugf(ctx, "ExecutionID: %s, ActiveNodeCount: %d, ActiveTaskCount: %d\n", execID, stats.ActiveNodeCount, stats.ActiveTaskCount) + } +} + +// RemoveTerminatedExecutions removes all terminated or deleted workflows from the executions map. +// This expects a set of strings for simplified lookup in the critical section. +func (esh *ExecutionStatsHolder) RemoveTerminatedExecutions(ctx context.Context, workflows map[string]bool) error { + // Acquire the mutex and remove all teminated or deleted workflows. + esh.mu.Lock() + defer esh.mu.Unlock() + for execId := range esh.executions { + if !workflows[execId] { + logger.Debugf(ctx, "Deleting active execution entry for execId: %s", execId) + delete(esh.executions, execId) + } + } + return nil +} diff --git a/flytepropeller/pkg/controller/workflowstore/execution_stats_test.go b/flytepropeller/pkg/controller/workflowstore/execution_stats_test.go new file mode 100644 index 00000000000..4cfa0984215 --- /dev/null +++ b/flytepropeller/pkg/controller/workflowstore/execution_stats_test.go @@ -0,0 +1,75 @@ +package workflowstore + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAddOrUpdateEntry(t *testing.T) { + esh, err := NewExecutionStatsHolder() + assert.NoError(t, err) + + err = esh.AddOrUpdateEntry("exec1", SingleExecutionStats{ActiveNodeCount: 5, ActiveTaskCount: 10}) + assert.NoError(t, err) + assert.Equal(t, 1, len(esh.executions)) + assert.Equal(t, uint32(5), esh.executions["exec1"].ActiveNodeCount) + assert.Equal(t, uint32(10), esh.executions["exec1"].ActiveTaskCount) +} + +func TestAggregateActiveValues(t *testing.T) { + esh, _ := NewExecutionStatsHolder() + esh.AddOrUpdateEntry("exec1", SingleExecutionStats{ActiveNodeCount: 5, ActiveTaskCount: 10}) + esh.AddOrUpdateEntry("exec2", SingleExecutionStats{ActiveNodeCount: 3, ActiveTaskCount: 6}) + + flows, nodes, tasks, err := esh.AggregateActiveValues() + assert.NoError(t, err) + assert.Equal(t, 2, flows) + assert.Equal(t, uint32(8), nodes) + assert.Equal(t, uint32(16), tasks) +} + +func TestConcurrentAccess(t *testing.T) { + esh, err := NewExecutionStatsHolder() + assert.NoError(t, err) + + var wg sync.WaitGroup + // Number of concurrent operations + concurrentOps := 100 + + // Concurrently add or update entries + for i := 0; i < concurrentOps; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + execId := fmt.Sprintf("exec%d", id) + esh.AddOrUpdateEntry(execId, SingleExecutionStats{ActiveNodeCount: uint32(id), ActiveTaskCount: uint32(id * 2)}) + }(i) + } + + // Concurrently sum active values + for i := 0; i < concurrentOps/2; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, _, _, err := esh.AggregateActiveValues() + assert.NoError(t, err) + }() + } + + wg.Wait() + + // Remove all entries + err = esh.RemoveTerminatedExecutions(context.TODO(), map[string]bool{}) + assert.NoError(t, err) + + // After all operations, sum should be predictable as all entries should be deleted + flows, nodes, tasks, err := esh.AggregateActiveValues() + assert.NoError(t, err) + assert.Zero(t, flows) + assert.Zero(t, nodes) + assert.Zero(t, tasks) +}