diff --git a/flytepropeller/pkg/controller/controller.go b/flytepropeller/pkg/controller/controller.go index acd7747d3f7..631d85bd2cf 100644 --- a/flytepropeller/pkg/controller/controller.go +++ b/flytepropeller/pkg/controller/controller.go @@ -82,10 +82,11 @@ type Controller struct { workflowStore workflowstore.FlyteWorkflow // recorder is an event recorder for recording Event resources to the // Kubernetes API. - recorder record.EventRecorder - metrics *metrics - leaderElector *leaderelection.LeaderElector - levelMonitor *ResourceLevelMonitor + recorder record.EventRecorder + metrics *metrics + leaderElector *leaderelection.LeaderElector + levelMonitor *ResourceLevelMonitor + executionStats *workflowstore.ExecutionStatsMonitor } // Run either as a leader -if configured- or as a standalone process. @@ -117,6 +118,7 @@ func (c *Controller) run(ctx context.Context) error { // Start the collector process c.levelMonitor.RunCollector(ctx) + c.executionStats.RunStatsMonitor(ctx) // Start the informer factories to begin populating the informer caches logger.Info(ctx, "Starting FlyteWorkflow controller") @@ -329,7 +331,6 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter if sCfg == nil { logger.Errorf(ctx, "Storage configuration missing.") } - store, err := storage.NewDataStore(sCfg, scope.NewSubScope("metastore")) if err != nil { return nil, errors.Wrapf(err, "Failed to create Metadata storage") @@ -445,7 +446,13 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter return nil, errors.Wrapf(err, "Failed to create Controller.") } - workflowExecutor, err := workflow.NewExecutor(ctx, store, controller.enqueueWorkflowForNodeUpdates, eventSink, controller.recorder, cfg.MetadataPrefix, nodeExecutor, &cfg.EventConfig, cfg.ClusterID, scope) + activeExecutions, err := workflowstore.NewExecutionStatsHolder() + if err != nil { + return nil, err + } + controller.executionStats = workflowstore.NewExecutionStatsMonitor(scope.NewSubScope("execstats"), flyteworkflowInformer.Lister(), activeExecutions) + + workflowExecutor, err := workflow.NewExecutor(ctx, store, controller.enqueueWorkflowForNodeUpdates, eventSink, controller.recorder, cfg.MetadataPrefix, nodeExecutor, &cfg.EventConfig, cfg.ClusterID, scope, activeExecutions) if err != nil { return nil, err } diff --git a/flytepropeller/pkg/controller/executors/execution_context.go b/flytepropeller/pkg/controller/executors/execution_context.go index ec182ab3cd1..84799a04004 100644 --- a/flytepropeller/pkg/controller/executors/execution_context.go +++ b/flytepropeller/pkg/controller/executors/execution_context.go @@ -33,6 +33,10 @@ type ImmutableParentInfo interface { type ControlFlow interface { CurrentParallelism() uint32 IncrementParallelism() uint32 + CurrentNodeExecutionCount() uint32 + IncrementNodeExecutionCount() uint32 + CurrentTaskExecutionCount() uint32 + IncrementTaskExecutionCount() uint32 } type ExecutionContext interface { @@ -71,16 +75,36 @@ func (p *parentExecutionInfo) CurrentAttempt() uint32 { type controlFlow struct { // We could use atomic.Uint32, but this is not required for current Propeller. As every round is run in a single // thread and using atomic will introduce memory barriers - v uint32 + parallelism uint32 + nodeExecutionCount uint32 + taskExecutionCount uint32 } func (c *controlFlow) CurrentParallelism() uint32 { - return c.v + return c.parallelism } func (c *controlFlow) IncrementParallelism() uint32 { - c.v = c.v + 1 - return c.v + c.parallelism = c.parallelism + 1 + return c.parallelism +} + +func (c *controlFlow) CurrentNodeExecutionCount() uint32 { + return c.nodeExecutionCount +} + +func (c *controlFlow) IncrementNodeExecutionCount() uint32 { + c.nodeExecutionCount++ + return c.nodeExecutionCount +} + +func (c *controlFlow) CurrentTaskExecutionCount() uint32 { + return c.taskExecutionCount +} + +func (c *controlFlow) IncrementTaskExecutionCount() uint32 { + c.taskExecutionCount++ + return c.taskExecutionCount } func NewExecutionContextWithTasksGetter(prevExecContext ExecutionContext, taskGetter TaskDetailsGetter) ExecutionContext { @@ -114,6 +138,8 @@ func NewParentInfo(uniqueID string, currentAttempts uint32) ImmutableParentInfo func InitializeControlFlow() ControlFlow { return &controlFlow{ - v: 0, + parallelism: 0, + nodeExecutionCount: 0, + taskExecutionCount: 0, } } diff --git a/flytepropeller/pkg/controller/executors/mocks/execution_context.go b/flytepropeller/pkg/controller/executors/mocks/execution_context.go index 1f551a7be12..cf9f4bcb499 100644 --- a/flytepropeller/pkg/controller/executors/mocks/execution_context.go +++ b/flytepropeller/pkg/controller/executors/mocks/execution_context.go @@ -20,6 +20,38 @@ type ExecutionContext struct { mock.Mock } +type ExecutionContext_CurrentNodeExecutionCount struct { + *mock.Call +} + +func (_m ExecutionContext_CurrentNodeExecutionCount) Return(_a0 uint32) *ExecutionContext_CurrentNodeExecutionCount { + return &ExecutionContext_CurrentNodeExecutionCount{Call: _m.Call.Return(_a0)} +} + +func (_m *ExecutionContext) OnCurrentNodeExecutionCount() *ExecutionContext_CurrentNodeExecutionCount { + c_call := _m.On("CurrentNodeExecutionCount") + return &ExecutionContext_CurrentNodeExecutionCount{Call: c_call} +} + +func (_m *ExecutionContext) OnCurrentNodeExecutionCountMatch(matchers ...interface{}) *ExecutionContext_CurrentNodeExecutionCount { + c_call := _m.On("CurrentNodeExecutionCount", matchers...) + return &ExecutionContext_CurrentNodeExecutionCount{Call: c_call} +} + +// CurrentNodeExecutionCount provides a mock function with given fields: +func (_m *ExecutionContext) CurrentNodeExecutionCount() uint32 { + ret := _m.Called() + + var r0 uint32 + if rf, ok := ret.Get(0).(func() uint32); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint32) + } + + return r0 +} + type ExecutionContext_CurrentParallelism struct { *mock.Call } @@ -52,6 +84,38 @@ func (_m *ExecutionContext) CurrentParallelism() uint32 { return r0 } +type ExecutionContext_CurrentTaskExecutionCount struct { + *mock.Call +} + +func (_m ExecutionContext_CurrentTaskExecutionCount) Return(_a0 uint32) *ExecutionContext_CurrentTaskExecutionCount { + return &ExecutionContext_CurrentTaskExecutionCount{Call: _m.Call.Return(_a0)} +} + +func (_m *ExecutionContext) OnCurrentTaskExecutionCount() *ExecutionContext_CurrentTaskExecutionCount { + c_call := _m.On("CurrentTaskExecutionCount") + return &ExecutionContext_CurrentTaskExecutionCount{Call: c_call} +} + +func (_m *ExecutionContext) OnCurrentTaskExecutionCountMatch(matchers ...interface{}) *ExecutionContext_CurrentTaskExecutionCount { + c_call := _m.On("CurrentTaskExecutionCount", matchers...) + return &ExecutionContext_CurrentTaskExecutionCount{Call: c_call} +} + +// CurrentTaskExecutionCount provides a mock function with given fields: +func (_m *ExecutionContext) CurrentTaskExecutionCount() uint32 { + ret := _m.Called() + + var r0 uint32 + if rf, ok := ret.Get(0).(func() uint32); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint32) + } + + return r0 +} + type ExecutionContext_FindSubWorkflow struct { *mock.Call } @@ -677,6 +741,38 @@ func (_m *ExecutionContext) GetTask(id string) (v1alpha1.ExecutableTask, error) return r0, r1 } +type ExecutionContext_IncrementNodeExecutionCount struct { + *mock.Call +} + +func (_m ExecutionContext_IncrementNodeExecutionCount) Return(_a0 uint32) *ExecutionContext_IncrementNodeExecutionCount { + return &ExecutionContext_IncrementNodeExecutionCount{Call: _m.Call.Return(_a0)} +} + +func (_m *ExecutionContext) OnIncrementNodeExecutionCount() *ExecutionContext_IncrementNodeExecutionCount { + c_call := _m.On("IncrementNodeExecutionCount") + return &ExecutionContext_IncrementNodeExecutionCount{Call: c_call} +} + +func (_m *ExecutionContext) OnIncrementNodeExecutionCountMatch(matchers ...interface{}) *ExecutionContext_IncrementNodeExecutionCount { + c_call := _m.On("IncrementNodeExecutionCount", matchers...) + return &ExecutionContext_IncrementNodeExecutionCount{Call: c_call} +} + +// IncrementNodeExecutionCount provides a mock function with given fields: +func (_m *ExecutionContext) IncrementNodeExecutionCount() uint32 { + ret := _m.Called() + + var r0 uint32 + if rf, ok := ret.Get(0).(func() uint32); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint32) + } + + return r0 +} + type ExecutionContext_IncrementParallelism struct { *mock.Call } @@ -709,6 +805,38 @@ func (_m *ExecutionContext) IncrementParallelism() uint32 { return r0 } +type ExecutionContext_IncrementTaskExecutionCount struct { + *mock.Call +} + +func (_m ExecutionContext_IncrementTaskExecutionCount) Return(_a0 uint32) *ExecutionContext_IncrementTaskExecutionCount { + return &ExecutionContext_IncrementTaskExecutionCount{Call: _m.Call.Return(_a0)} +} + +func (_m *ExecutionContext) OnIncrementTaskExecutionCount() *ExecutionContext_IncrementTaskExecutionCount { + c_call := _m.On("IncrementTaskExecutionCount") + return &ExecutionContext_IncrementTaskExecutionCount{Call: c_call} +} + +func (_m *ExecutionContext) OnIncrementTaskExecutionCountMatch(matchers ...interface{}) *ExecutionContext_IncrementTaskExecutionCount { + c_call := _m.On("IncrementTaskExecutionCount", matchers...) + return &ExecutionContext_IncrementTaskExecutionCount{Call: c_call} +} + +// IncrementTaskExecutionCount provides a mock function with given fields: +func (_m *ExecutionContext) IncrementTaskExecutionCount() uint32 { + ret := _m.Called() + + var r0 uint32 + if rf, ok := ret.Get(0).(func() uint32); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint32) + } + + return r0 +} + type ExecutionContext_IsInterruptible struct { *mock.Call } diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index f0b91217e09..b36f3a5b626 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -120,6 +120,10 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte }, nil, ) + executionContext.OnIncrementNodeExecutionCount().Return(1) + executionContext.OnIncrementTaskExecutionCount().Return(1) + executionContext.OnCurrentNodeExecutionCount().Return(1) + executionContext.OnCurrentTaskExecutionCount().Return(1) nCtx.OnExecutionContext().Return(executionContext) // EventsRecorder diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index cf8c62cfad5..b3b63cc4487 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -182,6 +182,15 @@ func (c *recursiveNodeExecutor) RecursiveNodeHandler(ctx context.Context, execCo nodeStatus := nl.GetNodeExecutionStatus(ctx, currentNode.GetID()) nodePhase := nodeStatus.GetPhase() + if nodePhase == v1alpha1.NodePhaseRunning && execContext != nil { + execContext.IncrementNodeExecutionCount() + if currentNode.GetKind() == v1alpha1.NodeKindTask { + execContext.IncrementTaskExecutionCount() + } + logger.Debugf(currentNodeCtx, "recursive handler - node execution count [%v], task execution count [%v], phase [%v], ", + execContext.CurrentNodeExecutionCount(), execContext.CurrentTaskExecutionCount(), nodePhase.String()) + } + if canHandleNode(nodePhase) { // TODO Follow up Pull Request, // 1. Rename this method to DAGTraversalHandleNode (accepts a DAGStructure along-with) the remaining arguments @@ -287,6 +296,7 @@ func (c *recursiveNodeExecutor) handleDownstream(ctx context.Context, execContex }), nil } + logger.Debugf(ctx, "downstream handler starting node id %v, ", downstreamNode.GetID()) state, err := c.RecursiveNodeHandler(ctx, execContext, dag, nl, downstreamNode) if err != nil { return interfaces.NodeStatusUndefined, err diff --git a/flytepropeller/pkg/controller/workflow/executor.go b/flytepropeller/pkg/controller/workflow/executor.go index 355681a7463..c2f7a35ebef 100644 --- a/flytepropeller/pkg/controller/workflow/executor.go +++ b/flytepropeller/pkg/controller/workflow/executor.go @@ -17,6 +17,7 @@ import ( "github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/workflow/errors" + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/workflowstore" "github.com/flyteorg/flyte/flytepropeller/pkg/utils" "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/flyteorg/flyte/flytestdlib/promutils" @@ -59,15 +60,16 @@ func StatusFailed(err *core.ExecutionError) Status { } type workflowExecutor struct { - enqueueWorkflow v1alpha1.EnqueueWorkflow - store *storage.DataStore - wfRecorder events.WorkflowEventRecorder - k8sRecorder record.EventRecorder - metadataPrefix storage.DataReference - nodeExecutor interfaces.Node - metrics *workflowMetrics - eventConfig *config.EventConfig - clusterID string + enqueueWorkflow v1alpha1.EnqueueWorkflow + store *storage.DataStore + wfRecorder events.WorkflowEventRecorder + k8sRecorder record.EventRecorder + metadataPrefix storage.DataReference + nodeExecutor interfaces.Node + metrics *workflowMetrics + eventConfig *config.EventConfig + clusterID string + activeExecutions *workflowstore.ExecutionStatsHolder } func (c *workflowExecutor) constructWorkflowMetadataPrefix(ctx context.Context, w *v1alpha1.FlyteWorkflow) (storage.DataReference, error) { @@ -135,6 +137,18 @@ func (c *workflowExecutor) handleReadyWorkflow(ctx context.Context, w *v1alpha1. return StatusRunning, nil } +func (c *workflowExecutor) updateExecutionStats(ctx context.Context, execcontext executors.ExecutionContext) { + execStats := workflowstore.SingleExecutionStats{ + ActiveNodeCount: execcontext.CurrentNodeExecutionCount(), + ActiveTaskCount: execcontext.CurrentTaskExecutionCount()} + logger.Debugf(ctx, "execution stats - execution count [%v], task execution count [%v], execution-id [%v], ", + execStats.ActiveNodeCount, execStats.ActiveTaskCount, execcontext.GetExecutionID()) + statErr := c.activeExecutions.AddOrUpdateEntry(execcontext.GetExecutionID().String(), execStats) + if statErr != nil { + logger.Errorf(ctx, "error updating active executions stats: %v", statErr) + } +} + func (c *workflowExecutor) handleRunningWorkflow(ctx context.Context, w *v1alpha1.FlyteWorkflow) (Status, error) { startNode := w.StartNode() if startNode == nil { @@ -144,10 +158,11 @@ func (c *workflowExecutor) handleRunningWorkflow(ctx context.Context, w *v1alpha Message: "Start node not found"}), nil } execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow()) - state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, w, startNode) + state, handlerErr := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, w, startNode) + c.updateExecutionStats(ctx, execcontext) - if err != nil { - return StatusRunning, err + if handlerErr != nil { + return StatusRunning, handlerErr } if state.HasFailed() { logger.Infof(ctx, "Workflow has failed. Error [%s]", state.Err.String()) @@ -175,9 +190,11 @@ func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.Fl failureNodeStatus := w.GetExecutionStatus().GetNodeExecutionStatus(ctx, failureNode.GetID()) failureNodeLookup := executors.NewFailureNodeLookup(w, failureNode, failureNodeStatus) - state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, failureNodeLookup, failureNodeLookup, failureNode) - if err != nil { - return StatusFailureNode(execErr), err + state, handlerErr := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, failureNodeLookup, failureNodeLookup, failureNode) + c.updateExecutionStats(ctx, execcontext) + + if handlerErr != nil { + return StatusFailureNode(execErr), handlerErr } switch state.NodePhase { @@ -504,7 +521,7 @@ func (c *workflowExecutor) cleanupRunningNodes(ctx context.Context, w v1alpha1.E func NewExecutor(ctx context.Context, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink, k8sEventRecorder record.EventRecorder, metadataPrefix string, nodeExecutor interfaces.Node, eventConfig *config.EventConfig, - clusterID string, scope promutils.Scope) (executors.Workflow, error) { + clusterID string, scope promutils.Scope, activeExecutions *workflowstore.ExecutionStatsHolder) (executors.Workflow, error) { basePrefix := store.GetBaseContainerFQN(ctx) if metadataPrefix != "" { var err error @@ -518,15 +535,16 @@ func NewExecutor(ctx context.Context, store *storage.DataStore, enQWorkflow v1al workflowScope := scope.NewSubScope("workflow") return &workflowExecutor{ - nodeExecutor: nodeExecutor, - store: store, - enqueueWorkflow: enQWorkflow, - wfRecorder: events.NewWorkflowEventRecorder(eventSink, workflowScope, store), - k8sRecorder: k8sEventRecorder, - metadataPrefix: basePrefix, - metrics: newMetrics(workflowScope), - eventConfig: eventConfig, - clusterID: clusterID, + nodeExecutor: nodeExecutor, + store: store, + enqueueWorkflow: enQWorkflow, + wfRecorder: events.NewWorkflowEventRecorder(eventSink, workflowScope, store), + k8sRecorder: k8sEventRecorder, + metadataPrefix: basePrefix, + metrics: newMetrics(workflowScope), + eventConfig: eventConfig, + clusterID: clusterID, + activeExecutions: activeExecutions, }, nil } diff --git a/flytepropeller/pkg/controller/workflow/executor_test.go b/flytepropeller/pkg/controller/workflow/executor_test.go index cc9910abc39..4050e415824 100644 --- a/flytepropeller/pkg/controller/workflow/executor_test.go +++ b/flytepropeller/pkg/controller/workflow/executor_test.go @@ -42,6 +42,7 @@ import ( "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/subworkflow/launchplan" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/fakeplugins" wfErrors "github.com/flyteorg/flyte/flytepropeller/pkg/controller/workflow/errors" + execStats "github.com/flyteorg/flyte/flytepropeller/pkg/controller/workflowstore" "github.com/flyteorg/flyte/flytestdlib/contextutils" "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/flyteorg/flyte/flytestdlib/promutils" @@ -248,7 +249,10 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Error(t *testing.T) { nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, promutils.NewTestScope()) assert.NoError(t, err) - executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope()) + + execStatsHolder, err := execStats.NewExecutionStatsHolder() + assert.NoError(t, err) + executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope(), execStatsHolder) assert.NoError(t, err) assert.NoError(t, executor.Initialize(ctx)) @@ -332,7 +336,9 @@ func TestWorkflowExecutor_HandleFlyteWorkflow(t *testing.T) { maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, promutils.NewTestScope()) assert.NoError(t, err) - executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope()) + execStatsHolder, err := execStats.NewExecutionStatsHolder() + assert.NoError(t, err) + executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope(), execStatsHolder) assert.NoError(t, err) assert.NoError(t, executor.Initialize(ctx)) @@ -396,7 +402,9 @@ func BenchmarkWorkflowExecutor(b *testing.B) { maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, scope) assert.NoError(b, err) - executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope()) + execStatsHolder, err := execStats.NewExecutionStatsHolder() + assert.NoError(b, err) + executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope(), execStatsHolder) assert.NoError(b, err) assert.NoError(b, executor.Initialize(ctx)) @@ -507,7 +515,10 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) { nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, promutils.NewTestScope()) assert.NoError(t, err) - executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope()) + + execStatsHolder, err := execStats.NewExecutionStatsHolder() + assert.NoError(t, err) + executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope(), execStatsHolder) assert.NoError(t, err) assert.NoError(t, executor.Initialize(ctx)) @@ -609,7 +620,9 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Events(t *testing.T) { nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, promutils.NewTestScope()) assert.NoError(t, err) - executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, testClusterID, promutils.NewTestScope()) + execStatsHolder, err := execStats.NewExecutionStatsHolder() + assert.NoError(t, err) + executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, testClusterID, promutils.NewTestScope(), execStatsHolder) assert.NoError(t, err) assert.NoError(t, executor.Initialize(ctx)) @@ -684,7 +697,9 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_EventFailure(t *testing.T) { Cause: errors.New("already exists"), } } - executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, testClusterID, promutils.NewTestScope()) + execStatsHolder, err := execStats.NewExecutionStatsHolder() + assert.NoError(t, err) + executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, testClusterID, promutils.NewTestScope(), execStatsHolder) assert.NoError(t, err) w := &v1alpha1.FlyteWorkflow{} assert.NoError(t, json.Unmarshal(wJSON, w)) @@ -703,7 +718,9 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_EventFailure(t *testing.T) { Cause: errors.New("already exists"), } } - executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, testClusterID, promutils.NewTestScope()) + execStatsHolder, err := execStats.NewExecutionStatsHolder() + assert.NoError(t, err) + executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, testClusterID, promutils.NewTestScope(), execStatsHolder) assert.NoError(t, err) w := &v1alpha1.FlyteWorkflow{} assert.NoError(t, json.Unmarshal(wJSON, w)) @@ -719,7 +736,9 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_EventFailure(t *testing.T) { Cause: errors.New("generic exists"), } } - executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, testClusterID, promutils.NewTestScope()) + execStatsHolder, err := execStats.NewExecutionStatsHolder() + assert.NoError(t, err) + executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, testClusterID, promutils.NewTestScope(), execStatsHolder) assert.NoError(t, err) w := &v1alpha1.FlyteWorkflow{} assert.NoError(t, json.Unmarshal(wJSON, w)) @@ -736,7 +755,9 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_EventFailure(t *testing.T) { Cause: errors.New("incompatible cluster"), } } - executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, testClusterID, promutils.NewTestScope()) + execStatsHolder, err := execStats.NewExecutionStatsHolder() + assert.NoError(t, err) + executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, testClusterID, promutils.NewTestScope(), execStatsHolder) assert.NoError(t, err) w := &v1alpha1.FlyteWorkflow{} assert.NoError(t, json.Unmarshal(wJSON, w)) diff --git a/flytepropeller/pkg/controller/workflowstore/execution_stats.go b/flytepropeller/pkg/controller/workflowstore/execution_stats.go new file mode 100644 index 00000000000..e36ff6c578a --- /dev/null +++ b/flytepropeller/pkg/controller/workflowstore/execution_stats.go @@ -0,0 +1,170 @@ +package workflowstore + +import ( + "context" + "fmt" + "runtime/pprof" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "k8s.io/apimachinery/pkg/labels" + + lister "github.com/flyteorg/flyte/flytepropeller/pkg/client/listers/flyteworkflow/v1alpha1" + "github.com/flyteorg/flyte/flytestdlib/contextutils" + "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/flyteorg/flyte/flytestdlib/promutils" +) + +const ( + resourceLevelExecutionStatsSyncDuration = 10 * time.Second +) + +type ExecutionStatsMonitor struct { + Scope promutils.Scope + + // Cached workflow list from the Informer + lister lister.FlyteWorkflowLister + + // Provides stats for the currently active executions + activeExecutions *ExecutionStatsHolder + + // These are currently aggregated values across all active workflows + ActiveNodeExecutions prometheus.Gauge + ActiveTaskExecutions prometheus.Gauge + ActiveWorkflowExecutions prometheus.Gauge +} + +func (e *ExecutionStatsMonitor) updateExecutionStats(ctx context.Context) { + // Convert the list of workflows to a map for faster lookups. + // Note that lister will exclude any workflows which are already marked terminated as the + // controller creates the informer with a label selector that excludes them - IgnoreCompletedWorkflowsLabelSelector() + workflows, err := e.lister.List(labels.Everything()) + if err != nil { + logger.Errorf(ctx, "Failed to list workflows while removing terminated executions, %v", err) + return + } + workflowSet := make(map[string]bool) + for _, wf := range workflows { + workflowSet[wf.GetExecutionID().String()] = true + } + + err = e.activeExecutions.RemoveTerminatedExecutions(ctx, workflowSet) + if err != nil { + logger.Errorf(ctx, "Error while removing terminated executions from stats: %v ", err) + } +} + +func (e *ExecutionStatsMonitor) emitExecutionStats(ctx context.Context) { + executions, nodes, tasks, err := e.activeExecutions.AggregateActiveValues() + if err != nil { + logger.Errorf(ctx, "Error aggregating active execution stats: %v", err) + } + logger.Debugf(ctx, "Execution stats: ActiveExecutions: %d ActiveNodes: %d, ActiveTasks: %d", executions, nodes, tasks) + e.ActiveNodeExecutions.Set(float64(nodes)) + e.ActiveTaskExecutions.Set(float64(tasks)) + e.ActiveWorkflowExecutions.Set(float64(executions)) +} + +func (e *ExecutionStatsMonitor) RunStatsMonitor(ctx context.Context) { + ticker := time.NewTicker(resourceLevelExecutionStatsSyncDuration) + execStatsCtx := contextutils.WithGoroutineLabel(ctx, "execution-stats-monitor") + + go func() { + pprof.SetGoroutineLabels(execStatsCtx) + for { + select { + case <-execStatsCtx.Done(): + return + case <-ticker.C: + e.updateExecutionStats(ctx) + e.emitExecutionStats(ctx) + } + } + }() +} + +func NewExecutionStatsMonitor(scope promutils.Scope, lister lister.FlyteWorkflowLister, activeExecutions *ExecutionStatsHolder) *ExecutionStatsMonitor { + return &ExecutionStatsMonitor{ + Scope: scope, + lister: lister, + activeExecutions: activeExecutions, + ActiveNodeExecutions: scope.MustNewGauge("active_node_executions", "active node executions for propeller"), + ActiveTaskExecutions: scope.MustNewGauge("active_task_executions", "active task executions for propeller"), + ActiveWorkflowExecutions: scope.MustNewGauge("active_workflow_executions", "active workflow executions for propeller"), + } +} + +// SingleExecutionStats holds stats about a single workflow execution, such as active node and task counts. +type SingleExecutionStats struct { + ActiveNodeCount uint32 + ActiveTaskCount uint32 +} + +// ExecutionStatsHolder manages a map of execution IDs to their ExecutionStats. +type ExecutionStatsHolder struct { + mu sync.Mutex // Guards access to the map + executions map[string]SingleExecutionStats +} + +// NewExecutionStatsHolder creates a new ExecutionStatsHolder instance with an initialized map. +func NewExecutionStatsHolder() (*ExecutionStatsHolder, error) { + return &ExecutionStatsHolder{ + executions: make(map[string]SingleExecutionStats), + }, nil +} + +// AddOrUpdateEntry adds or updates an entry in the executions map. +func (esh *ExecutionStatsHolder) AddOrUpdateEntry(executionID string, executionStats SingleExecutionStats) error { + if esh == nil || esh.executions == nil { + return fmt.Errorf("ExecutionStatsHolder is not initialized") + } + + esh.mu.Lock() + defer esh.mu.Unlock() + esh.executions[executionID] = executionStats + + return nil +} + +// Returns the aggregate of all active node and task counts in the map. +func (esh *ExecutionStatsHolder) AggregateActiveValues() (int, uint32, uint32, error) { + if esh == nil || esh.executions == nil { + return 0, 0, 0, fmt.Errorf("ActiveExecutions is not initialized") + } + + esh.mu.Lock() + defer esh.mu.Unlock() + + var sumNodes, sumTasks uint32 = 0, 0 + for _, stats := range esh.executions { + sumNodes += stats.ActiveNodeCount + sumTasks += stats.ActiveTaskCount + } + return len(esh.executions), sumNodes, sumTasks, nil +} + +func (esh *ExecutionStatsHolder) LogAllActiveExecutions(ctx context.Context) { + esh.mu.Lock() + defer esh.mu.Unlock() + + logger.Debugf(ctx, "Current Active Executions:") + for execID, stats := range esh.executions { + logger.Debugf(ctx, "ExecutionID: %s, ActiveNodeCount: %d, ActiveTaskCount: %d\n", execID, stats.ActiveNodeCount, stats.ActiveTaskCount) + } +} + +// RemoveTerminatedExecutions removes all terminated or deleted workflows from the executions map. +// This expects a set of strings for simplified lookup in the critical section. +func (esh *ExecutionStatsHolder) RemoveTerminatedExecutions(ctx context.Context, workflows map[string]bool) error { + // Acquire the mutex and remove all terminated or deleted workflows. + esh.mu.Lock() + defer esh.mu.Unlock() + for execID := range esh.executions { + if !workflows[execID] { + logger.Debugf(ctx, "Deleting active execution entry for execId: %s", execID) + delete(esh.executions, execID) + } + } + return nil +} diff --git a/flytepropeller/pkg/controller/workflowstore/execution_stats_test.go b/flytepropeller/pkg/controller/workflowstore/execution_stats_test.go new file mode 100644 index 00000000000..783e2ba6883 --- /dev/null +++ b/flytepropeller/pkg/controller/workflowstore/execution_stats_test.go @@ -0,0 +1,119 @@ +package workflowstore + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAddOrUpdateEntry(t *testing.T) { + esh, err := NewExecutionStatsHolder() + assert.NoError(t, err) + + err = esh.AddOrUpdateEntry("exec1", SingleExecutionStats{ActiveNodeCount: 5, ActiveTaskCount: 10}) + assert.NoError(t, err) + assert.Equal(t, 1, len(esh.executions)) + assert.Equal(t, uint32(5), esh.executions["exec1"].ActiveNodeCount) + assert.Equal(t, uint32(10), esh.executions["exec1"].ActiveTaskCount) +} + +func createDefaultExecutionStatsHolder() (*ExecutionStatsHolder, error) { + esh, err := NewExecutionStatsHolder() + if err != nil { + return nil, err + } + err = esh.AddOrUpdateEntry("exec1", SingleExecutionStats{ActiveNodeCount: 5, ActiveTaskCount: 10}) + if err != nil { + return nil, err + } + err = esh.AddOrUpdateEntry("exec2", SingleExecutionStats{ActiveNodeCount: 3, ActiveTaskCount: 6}) + if err != nil { + return nil, err + } + if err != nil { + return nil, err + } + return esh, nil +} + +func TestAggregateActiveValues(t *testing.T) { + esh, err := createDefaultExecutionStatsHolder() + assert.NoError(t, err) + + flows, nodes, tasks, err := esh.AggregateActiveValues() + assert.NoError(t, err) + assert.Equal(t, 2, flows) + assert.Equal(t, uint32(8), nodes) + assert.Equal(t, uint32(16), tasks) +} + +// Test removal on an empty ExecutionStatsHolder +func TestRemoveTerminatedExecutionsEmpty(t *testing.T) { + esh, err := NewExecutionStatsHolder() + assert.NoError(t, err) + + err = esh.RemoveTerminatedExecutions(context.TODO(), map[string]bool{}) + assert.NoError(t, err) + + err = esh.RemoveTerminatedExecutions(context.TODO(), map[string]bool{"exec1": true}) + assert.NoError(t, err) +} + +// Test removal of a subset of entries from ExcutionStatsHolder +func TestRemoveTerminatedExecutionsSubset(t *testing.T) { + esh, err := createDefaultExecutionStatsHolder() + assert.NoError(t, err) + + err = esh.RemoveTerminatedExecutions(context.TODO(), map[string]bool{"exec2": true}) + assert.NoError(t, err) + + assert.Equal(t, 1, len(esh.executions)) + assert.Equal(t, uint32(3), esh.executions["exec2"].ActiveNodeCount) + assert.Equal(t, uint32(6), esh.executions["exec2"].ActiveTaskCount) +} + +func TestConcurrentAccess(t *testing.T) { + esh, err := NewExecutionStatsHolder() + assert.NoError(t, err) + + var wg sync.WaitGroup + // Number of concurrent operations + concurrentOps := 100 + + // Concurrently add or update entries + for i := 0; i < concurrentOps; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + execID := fmt.Sprintf("exec%d", id) + err := esh.AddOrUpdateEntry(execID, SingleExecutionStats{ActiveNodeCount: uint32(id), ActiveTaskCount: uint32(id * 2)}) + assert.NoError(t, err) + }(i) + } + + // Concurrently sum active values + for i := 0; i < concurrentOps/2; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, _, _, err := esh.AggregateActiveValues() + assert.NoError(t, err) + }() + } + + wg.Wait() + + // Remove all entries + err = esh.RemoveTerminatedExecutions(context.TODO(), map[string]bool{}) + assert.NoError(t, err) + + // After all operations, sum should be predictable as all entries should be deleted + flows, nodes, tasks, err := esh.AggregateActiveValues() + assert.NoError(t, err) + assert.Zero(t, flows) + assert.Zero(t, nodes) + assert.Zero(t, tasks) +}