From 8a948ceedd34b6c6f462cf24679079bc636b4363 Mon Sep 17 00:00:00 2001
From: Shardool <shardool.s@gmail.com>
Date: Sun, 25 Feb 2024 22:38:05 -0800
Subject: [PATCH] Add tracking for active node and task execution counts in
 propeller

---
 flytepropeller/pkg/controller/controller.go   |  19 +-
 .../controller/executors/execution_context.go |  36 +++-
 .../pkg/controller/nodes/executor.go          |  10 ++
 .../pkg/controller/workflow/executor.go       |  74 +++++---
 .../workflowstore/execution_stats.go          | 168 ++++++++++++++++++
 .../workflowstore/execution_stats_test.go     |  75 ++++++++
 6 files changed, 346 insertions(+), 36 deletions(-)
 create mode 100644 flytepropeller/pkg/controller/workflowstore/execution_stats.go
 create mode 100644 flytepropeller/pkg/controller/workflowstore/execution_stats_test.go

diff --git a/flytepropeller/pkg/controller/controller.go b/flytepropeller/pkg/controller/controller.go
index de28612c54b..3942492e852 100644
--- a/flytepropeller/pkg/controller/controller.go
+++ b/flytepropeller/pkg/controller/controller.go
@@ -82,10 +82,11 @@ type Controller struct {
 	workflowStore       workflowstore.FlyteWorkflow
 	// recorder is an event recorder for recording Event resources to the
 	// Kubernetes API.
-	recorder      record.EventRecorder
-	metrics       *metrics
-	leaderElector *leaderelection.LeaderElector
-	levelMonitor  *ResourceLevelMonitor
+	recorder       record.EventRecorder
+	metrics        *metrics
+	leaderElector  *leaderelection.LeaderElector
+	levelMonitor   *ResourceLevelMonitor
+	executionStats *workflowstore.ExecutionStatsMonitor
 }
 
 // Run either as a leader -if configured- or as a standalone process.
@@ -117,6 +118,7 @@ func (c *Controller) run(ctx context.Context) error {
 
 	// Start the collector process
 	c.levelMonitor.RunCollector(ctx)
+	c.executionStats.RunStatsMonitor(ctx)
 
 	// Start the informer factories to begin populating the informer caches
 	logger.Info(ctx, "Starting FlyteWorkflow controller")
@@ -329,7 +331,6 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter
 	if sCfg == nil {
 		logger.Errorf(ctx, "Storage configuration missing.")
 	}
-
 	store, err := storage.NewDataStore(sCfg, scope.NewSubScope("metastore"))
 	if err != nil {
 		return nil, errors.Wrapf(err, "Failed to create Metadata storage")
@@ -445,7 +446,13 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter
 		return nil, errors.Wrapf(err, "Failed to create Controller.")
 	}
 
-	workflowExecutor, err := workflow.NewExecutor(ctx, store, controller.enqueueWorkflowForNodeUpdates, eventSink, controller.recorder, cfg.MetadataPrefix, nodeExecutor, &cfg.EventConfig, cfg.ClusterID, scope)
+	activeExecutions, err := workflowstore.NewExecutionStatsHolder()
+	if err != nil {
+		return nil, err
+	}
+	controller.executionStats = workflowstore.NewExecutionStatsMonitor(scope.NewSubScope("execstats"), flyteworkflowInformer.Lister(), activeExecutions)
+
+	workflowExecutor, err := workflow.NewExecutor(ctx, store, controller.enqueueWorkflowForNodeUpdates, eventSink, controller.recorder, cfg.MetadataPrefix, nodeExecutor, &cfg.EventConfig, cfg.ClusterID, scope, activeExecutions)
 	if err != nil {
 		return nil, err
 	}
diff --git a/flytepropeller/pkg/controller/executors/execution_context.go b/flytepropeller/pkg/controller/executors/execution_context.go
index ec182ab3cd1..84799a04004 100644
--- a/flytepropeller/pkg/controller/executors/execution_context.go
+++ b/flytepropeller/pkg/controller/executors/execution_context.go
@@ -33,6 +33,10 @@ type ImmutableParentInfo interface {
 type ControlFlow interface {
 	CurrentParallelism() uint32
 	IncrementParallelism() uint32
+	CurrentNodeExecutionCount() uint32
+	IncrementNodeExecutionCount() uint32
+	CurrentTaskExecutionCount() uint32
+	IncrementTaskExecutionCount() uint32
 }
 
 type ExecutionContext interface {
@@ -71,16 +75,36 @@ func (p *parentExecutionInfo) CurrentAttempt() uint32 {
 type controlFlow struct {
 	// We could use atomic.Uint32, but this is not required for current Propeller. As every round is run in a single
 	// thread and using atomic will introduce memory barriers
-	v uint32
+	parallelism        uint32
+	nodeExecutionCount uint32
+	taskExecutionCount uint32
 }
 
 func (c *controlFlow) CurrentParallelism() uint32 {
-	return c.v
+	return c.parallelism
 }
 
 func (c *controlFlow) IncrementParallelism() uint32 {
-	c.v = c.v + 1
-	return c.v
+	c.parallelism = c.parallelism + 1
+	return c.parallelism
+}
+
+func (c *controlFlow) CurrentNodeExecutionCount() uint32 {
+	return c.nodeExecutionCount
+}
+
+func (c *controlFlow) IncrementNodeExecutionCount() uint32 {
+	c.nodeExecutionCount++
+	return c.nodeExecutionCount
+}
+
+func (c *controlFlow) CurrentTaskExecutionCount() uint32 {
+	return c.taskExecutionCount
+}
+
+func (c *controlFlow) IncrementTaskExecutionCount() uint32 {
+	c.taskExecutionCount++
+	return c.taskExecutionCount
 }
 
 func NewExecutionContextWithTasksGetter(prevExecContext ExecutionContext, taskGetter TaskDetailsGetter) ExecutionContext {
@@ -114,6 +138,8 @@ func NewParentInfo(uniqueID string, currentAttempts uint32) ImmutableParentInfo
 
 func InitializeControlFlow() ControlFlow {
 	return &controlFlow{
-		v: 0,
+		parallelism:        0,
+		nodeExecutionCount: 0,
+		taskExecutionCount: 0,
 	}
 }
diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go
index 0c98aeeb5fc..7c9884bf839 100644
--- a/flytepropeller/pkg/controller/nodes/executor.go
+++ b/flytepropeller/pkg/controller/nodes/executor.go
@@ -182,6 +182,15 @@ func (c *recursiveNodeExecutor) RecursiveNodeHandler(ctx context.Context, execCo
 	nodeStatus := nl.GetNodeExecutionStatus(ctx, currentNode.GetID())
 	nodePhase := nodeStatus.GetPhase()
 
+	if nodePhase == v1alpha1.NodePhaseRunning {
+		execContext.IncrementNodeExecutionCount()
+		if currentNode.GetKind() == v1alpha1.NodeKindTask {
+			execContext.IncrementTaskExecutionCount()
+		}
+		logger.Debugf(currentNodeCtx, "recursive handler - node execution count [%v], task execution count [%v], phase [%v], ",
+			execContext.CurrentNodeExecutionCount(), execContext.CurrentTaskExecutionCount(), nodePhase.String())
+	}
+
 	if canHandleNode(nodePhase) {
 		// TODO Follow up Pull Request,
 		// 1. Rename this method to DAGTraversalHandleNode (accepts a DAGStructure along-with) the remaining arguments
@@ -287,6 +296,7 @@ func (c *recursiveNodeExecutor) handleDownstream(ctx context.Context, execContex
 			}), nil
 		}
 
+		logger.Infof(ctx, "F3 starting node id %v, ", downstreamNode.GetID())
 		state, err := c.RecursiveNodeHandler(ctx, execContext, dag, nl, downstreamNode)
 		if err != nil {
 			return interfaces.NodeStatusUndefined, err
diff --git a/flytepropeller/pkg/controller/workflow/executor.go b/flytepropeller/pkg/controller/workflow/executor.go
index 355681a7463..9ae3850640b 100644
--- a/flytepropeller/pkg/controller/workflow/executor.go
+++ b/flytepropeller/pkg/controller/workflow/executor.go
@@ -17,6 +17,7 @@ import (
 	"github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors"
 	"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces"
 	"github.com/flyteorg/flyte/flytepropeller/pkg/controller/workflow/errors"
+	"github.com/flyteorg/flyte/flytepropeller/pkg/controller/workflowstore"
 	"github.com/flyteorg/flyte/flytepropeller/pkg/utils"
 	"github.com/flyteorg/flyte/flytestdlib/logger"
 	"github.com/flyteorg/flyte/flytestdlib/promutils"
@@ -59,15 +60,16 @@ func StatusFailed(err *core.ExecutionError) Status {
 }
 
 type workflowExecutor struct {
-	enqueueWorkflow v1alpha1.EnqueueWorkflow
-	store           *storage.DataStore
-	wfRecorder      events.WorkflowEventRecorder
-	k8sRecorder     record.EventRecorder
-	metadataPrefix  storage.DataReference
-	nodeExecutor    interfaces.Node
-	metrics         *workflowMetrics
-	eventConfig     *config.EventConfig
-	clusterID       string
+	enqueueWorkflow  v1alpha1.EnqueueWorkflow
+	store            *storage.DataStore
+	wfRecorder       events.WorkflowEventRecorder
+	k8sRecorder      record.EventRecorder
+	metadataPrefix   storage.DataReference
+	nodeExecutor     interfaces.Node
+	metrics          *workflowMetrics
+	eventConfig      *config.EventConfig
+	clusterID        string
+	activeExecutions *workflowstore.ExecutionStatsHolder
 }
 
 func (c *workflowExecutor) constructWorkflowMetadataPrefix(ctx context.Context, w *v1alpha1.FlyteWorkflow) (storage.DataReference, error) {
@@ -144,10 +146,20 @@ func (c *workflowExecutor) handleRunningWorkflow(ctx context.Context, w *v1alpha
 			Message: "Start node not found"}), nil
 	}
 	execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow())
-	state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, w, startNode)
+	state, handlerErr := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, w, startNode)
+
+	execStats := workflowstore.SingleExecutionStats{
+		ActiveNodeCount: execcontext.CurrentNodeExecutionCount(),
+		ActiveTaskCount: execcontext.CurrentTaskExecutionCount()}
+	logger.Debugf(ctx, "running workflow node execution count [%v], task execution count [%v], execution-id [%v], ",
+		execStats.ActiveNodeCount, execStats.ActiveTaskCount, execcontext.GetExecutionID())
+	statErr := c.activeExecutions.AddOrUpdateEntry(execcontext.GetExecutionID().String(), execStats)
+	if statErr != nil {
+		logger.Errorf(ctx, "error updating active executions stats: %v", handlerErr)
+	}
 
-	if err != nil {
-		return StatusRunning, err
+	if handlerErr != nil {
+		return StatusRunning, handlerErr
 	}
 	if state.HasFailed() {
 		logger.Infof(ctx, "Workflow has failed. Error [%s]", state.Err.String())
@@ -175,9 +187,20 @@ func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.Fl
 
 	failureNodeStatus := w.GetExecutionStatus().GetNodeExecutionStatus(ctx, failureNode.GetID())
 	failureNodeLookup := executors.NewFailureNodeLookup(w, failureNode, failureNodeStatus)
-	state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, failureNodeLookup, failureNodeLookup, failureNode)
-	if err != nil {
-		return StatusFailureNode(execErr), err
+	state, handlerErr := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, failureNodeLookup, failureNodeLookup, failureNode)
+
+	execStats := workflowstore.SingleExecutionStats{
+		ActiveNodeCount: execcontext.CurrentNodeExecutionCount(),
+		ActiveTaskCount: execcontext.CurrentTaskExecutionCount()}
+	logger.Debugf(ctx, "failure node execution count [%v], task execution count [%v], execution-id [%v], ",
+		execStats.ActiveNodeCount, execStats.ActiveTaskCount, execcontext.GetExecutionID())
+	statErr := c.activeExecutions.AddOrUpdateEntry(execcontext.GetID(), execStats)
+	if statErr != nil {
+		logger.Errorf(ctx, "error updating active executions stats: %v", handlerErr)
+	}
+
+	if handlerErr != nil {
+		return StatusFailureNode(execErr), handlerErr
 	}
 
 	switch state.NodePhase {
@@ -504,7 +527,7 @@ func (c *workflowExecutor) cleanupRunningNodes(ctx context.Context, w v1alpha1.E
 
 func NewExecutor(ctx context.Context, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink,
 	k8sEventRecorder record.EventRecorder, metadataPrefix string, nodeExecutor interfaces.Node, eventConfig *config.EventConfig,
-	clusterID string, scope promutils.Scope) (executors.Workflow, error) {
+	clusterID string, scope promutils.Scope, activeExecutions *workflowstore.ExecutionStatsHolder) (executors.Workflow, error) {
 	basePrefix := store.GetBaseContainerFQN(ctx)
 	if metadataPrefix != "" {
 		var err error
@@ -518,15 +541,16 @@ func NewExecutor(ctx context.Context, store *storage.DataStore, enQWorkflow v1al
 	workflowScope := scope.NewSubScope("workflow")
 
 	return &workflowExecutor{
-		nodeExecutor:    nodeExecutor,
-		store:           store,
-		enqueueWorkflow: enQWorkflow,
-		wfRecorder:      events.NewWorkflowEventRecorder(eventSink, workflowScope, store),
-		k8sRecorder:     k8sEventRecorder,
-		metadataPrefix:  basePrefix,
-		metrics:         newMetrics(workflowScope),
-		eventConfig:     eventConfig,
-		clusterID:       clusterID,
+		nodeExecutor:     nodeExecutor,
+		store:            store,
+		enqueueWorkflow:  enQWorkflow,
+		wfRecorder:       events.NewWorkflowEventRecorder(eventSink, workflowScope, store),
+		k8sRecorder:      k8sEventRecorder,
+		metadataPrefix:   basePrefix,
+		metrics:          newMetrics(workflowScope),
+		eventConfig:      eventConfig,
+		clusterID:        clusterID,
+		activeExecutions: activeExecutions,
 	}, nil
 }
 
diff --git a/flytepropeller/pkg/controller/workflowstore/execution_stats.go b/flytepropeller/pkg/controller/workflowstore/execution_stats.go
new file mode 100644
index 00000000000..4bf44210b98
--- /dev/null
+++ b/flytepropeller/pkg/controller/workflowstore/execution_stats.go
@@ -0,0 +1,168 @@
+package workflowstore
+
+import (
+	"context"
+	"fmt"
+	"runtime/pprof"
+	"sync"
+	"time"
+
+	lister "github.com/flyteorg/flyte/flytepropeller/pkg/client/listers/flyteworkflow/v1alpha1"
+	"github.com/flyteorg/flyte/flytestdlib/contextutils"
+	"github.com/flyteorg/flyte/flytestdlib/logger"
+	"github.com/flyteorg/flyte/flytestdlib/promutils"
+	"github.com/prometheus/client_golang/prometheus"
+	"k8s.io/apimachinery/pkg/labels"
+)
+
+const (
+	resourceLevelExecutionStatsSyncDuration = 20 * time.Second
+)
+
+type ExecutionStatsMonitor struct {
+	Scope promutils.Scope
+
+	// Cached workflow list from the Informer
+	lister lister.FlyteWorkflowLister
+
+	// Provides stats for the currently active executions
+	activeExecutions *ExecutionStatsHolder
+
+	// These are currently aggregated values across all active workflows
+	ActiveNodeExecutions     prometheus.Gauge
+	ActiveTaskExecutions     prometheus.Gauge
+	ActiveWorkflowExecutions prometheus.Gauge
+}
+
+func (e *ExecutionStatsMonitor) updateExecutionStats(ctx context.Context) {
+	// Convert the list of workflows to a map for faster lookups
+	// (todo) Update to include only the workflows in a running state
+	workflows, err := e.lister.List(labels.Everything())
+	if err != nil {
+		logger.Errorf(ctx, "Failed to list workflows while removing terminated executions, %v", err)
+		return
+	}
+	workflowSet := make(map[string]bool)
+	for _, wf := range workflows {
+		workflowSet[wf.GetExecutionID().String()] = true
+	}
+
+	err = e.activeExecutions.RemoveTerminatedExecutions(ctx, workflowSet)
+	if err != nil {
+		logger.Errorf(ctx, "Error while removing terminated executions from stats: %v ", err)
+	}
+}
+
+func (e *ExecutionStatsMonitor) emitExecutionStats(ctx context.Context) {
+	executions, nodes, tasks, err := e.activeExecutions.AggregateActiveValues()
+	if err != nil {
+		logger.Errorf(ctx, "Error aggregating active execution stats: %v", err)
+	}
+	logger.Debugf(ctx, "Execution stats: ActiveExecutions: %d ActiveNodes: %d, ActiveTasks: %d", executions, nodes, tasks)
+	e.ActiveNodeExecutions.Set(float64(nodes))
+	e.ActiveTaskExecutions.Set(float64(tasks))
+	e.ActiveWorkflowExecutions.Set(float64(executions))
+}
+
+func (e *ExecutionStatsMonitor) RunStatsMonitor(ctx context.Context) {
+	ticker := time.NewTicker(resourceLevelExecutionStatsSyncDuration)
+	execStatsCtx := contextutils.WithGoroutineLabel(ctx, "execution-stats-monitor")
+
+	go func() {
+		pprof.SetGoroutineLabels(execStatsCtx)
+		for {
+			select {
+			case <-execStatsCtx.Done():
+				return
+			case <-ticker.C:
+				e.updateExecutionStats(ctx)
+				e.emitExecutionStats(ctx)
+			}
+		}
+	}()
+}
+
+func NewExecutionStatsMonitor(scope promutils.Scope, lister lister.FlyteWorkflowLister, activeExecutions *ExecutionStatsHolder) *ExecutionStatsMonitor {
+	return &ExecutionStatsMonitor{
+		Scope:                    scope,
+		lister:                   lister,
+		activeExecutions:         activeExecutions,
+		ActiveNodeExecutions:     scope.MustNewGauge("active_node_executions", "active node executions for propeller"),
+		ActiveTaskExecutions:     scope.MustNewGauge("active_task_executions", "active task executions for propeller"),
+		ActiveWorkflowExecutions: scope.MustNewGauge("active_workflow_executions", "active workflow executions for propeller"),
+	}
+}
+
+// SingleExecutionStats holds stats about a single workflow execution, such as active node and task counts.
+type SingleExecutionStats struct {
+	ActiveNodeCount uint32
+	ActiveTaskCount uint32
+}
+
+// ExecutionStatsHolder manages a map of execution IDs to their ExecutionStats.
+type ExecutionStatsHolder struct {
+	mu         sync.Mutex // Guards access to the map
+	executions map[string]SingleExecutionStats
+}
+
+// NewExecutionStatsHolder creates a new ExecutionStatsHolder instance with an initialized map.
+func NewExecutionStatsHolder() (*ExecutionStatsHolder, error) {
+	return &ExecutionStatsHolder{
+		executions: make(map[string]SingleExecutionStats),
+	}, nil
+}
+
+// AddOrUpdateEntry adds or updates an entry in the executions map.
+func (esh *ExecutionStatsHolder) AddOrUpdateEntry(executionId string, executionStats SingleExecutionStats) error {
+	if esh == nil || esh.executions == nil {
+		return fmt.Errorf("ActiveExecutions is not initialized")
+	}
+
+	esh.mu.Lock()
+	defer esh.mu.Unlock()
+	esh.executions[executionId] = executionStats
+
+	return nil
+}
+
+// Returns the aggregate of all active node and task counts in the map.
+func (esh *ExecutionStatsHolder) AggregateActiveValues() (int, uint32, uint32, error) {
+	if esh == nil || esh.executions == nil {
+		return 0, 0, 0, fmt.Errorf("ActiveExecutions is not initialized")
+	}
+
+	esh.mu.Lock()
+	defer esh.mu.Unlock()
+
+	var sumNodes, sumTasks uint32 = 0, 0
+	for _, stats := range esh.executions {
+		sumNodes += stats.ActiveNodeCount
+		sumTasks += stats.ActiveTaskCount
+	}
+	return len(esh.executions), sumNodes, sumTasks, nil
+}
+
+func (esh *ExecutionStatsHolder) LogAllActiveExecutions(ctx context.Context) {
+	esh.mu.Lock()
+	defer esh.mu.Unlock()
+
+	logger.Debugf(ctx, "Current Active Executions:")
+	for execID, stats := range esh.executions {
+		logger.Debugf(ctx, "ExecutionID: %s, ActiveNodeCount: %d, ActiveTaskCount: %d\n", execID, stats.ActiveNodeCount, stats.ActiveTaskCount)
+	}
+}
+
+// RemoveTerminatedExecutions removes all terminated or deleted workflows from the executions map.
+// This expects a set of strings for simplified lookup in the critical section.
+func (esh *ExecutionStatsHolder) RemoveTerminatedExecutions(ctx context.Context, workflows map[string]bool) error {
+	// Acquire the mutex and remove all teminated or deleted workflows.
+	esh.mu.Lock()
+	defer esh.mu.Unlock()
+	for execId := range esh.executions {
+		if !workflows[execId] {
+			logger.Debugf(ctx, "Deleting active execution entry for execId: %s", execId)
+			delete(esh.executions, execId)
+		}
+	}
+	return nil
+}
diff --git a/flytepropeller/pkg/controller/workflowstore/execution_stats_test.go b/flytepropeller/pkg/controller/workflowstore/execution_stats_test.go
new file mode 100644
index 00000000000..4cfa0984215
--- /dev/null
+++ b/flytepropeller/pkg/controller/workflowstore/execution_stats_test.go
@@ -0,0 +1,75 @@
+package workflowstore
+
+import (
+	"context"
+	"fmt"
+	"sync"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestAddOrUpdateEntry(t *testing.T) {
+	esh, err := NewExecutionStatsHolder()
+	assert.NoError(t, err)
+
+	err = esh.AddOrUpdateEntry("exec1", SingleExecutionStats{ActiveNodeCount: 5, ActiveTaskCount: 10})
+	assert.NoError(t, err)
+	assert.Equal(t, 1, len(esh.executions))
+	assert.Equal(t, uint32(5), esh.executions["exec1"].ActiveNodeCount)
+	assert.Equal(t, uint32(10), esh.executions["exec1"].ActiveTaskCount)
+}
+
+func TestAggregateActiveValues(t *testing.T) {
+	esh, _ := NewExecutionStatsHolder()
+	esh.AddOrUpdateEntry("exec1", SingleExecutionStats{ActiveNodeCount: 5, ActiveTaskCount: 10})
+	esh.AddOrUpdateEntry("exec2", SingleExecutionStats{ActiveNodeCount: 3, ActiveTaskCount: 6})
+
+	flows, nodes, tasks, err := esh.AggregateActiveValues()
+	assert.NoError(t, err)
+	assert.Equal(t, 2, flows)
+	assert.Equal(t, uint32(8), nodes)
+	assert.Equal(t, uint32(16), tasks)
+}
+
+func TestConcurrentAccess(t *testing.T) {
+	esh, err := NewExecutionStatsHolder()
+	assert.NoError(t, err)
+
+	var wg sync.WaitGroup
+	// Number of concurrent operations
+	concurrentOps := 100
+
+	// Concurrently add or update entries
+	for i := 0; i < concurrentOps; i++ {
+		wg.Add(1)
+		go func(id int) {
+			defer wg.Done()
+			execId := fmt.Sprintf("exec%d", id)
+			esh.AddOrUpdateEntry(execId, SingleExecutionStats{ActiveNodeCount: uint32(id), ActiveTaskCount: uint32(id * 2)})
+		}(i)
+	}
+
+	// Concurrently sum active values
+	for i := 0; i < concurrentOps/2; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			_, _, _, err := esh.AggregateActiveValues()
+			assert.NoError(t, err)
+		}()
+	}
+
+	wg.Wait()
+
+	// Remove all entries
+	err = esh.RemoveTerminatedExecutions(context.TODO(), map[string]bool{})
+	assert.NoError(t, err)
+
+	// After all operations, sum should be predictable as all entries should be deleted
+	flows, nodes, tasks, err := esh.AggregateActiveValues()
+	assert.NoError(t, err)
+	assert.Zero(t, flows)
+	assert.Zero(t, nodes)
+	assert.Zero(t, tasks)
+}