diff --git a/flytepropeller/pkg/controller/workflowstore/execution_stats.go b/flytepropeller/pkg/controller/workflowstore/execution_stats.go index 7012167eb6..d7e0d5c903 100644 --- a/flytepropeller/pkg/controller/workflowstore/execution_stats.go +++ b/flytepropeller/pkg/controller/workflowstore/execution_stats.go @@ -114,14 +114,14 @@ func NewExecutionStatsHolder() (*ExecutionStatsHolder, error) { } // AddOrUpdateEntry adds or updates an entry in the executions map. -func (esh *ExecutionStatsHolder) AddOrUpdateEntry(executionId string, executionStats SingleExecutionStats) error { +func (esh *ExecutionStatsHolder) AddOrUpdateEntry(executionID string, executionStats SingleExecutionStats) error { if esh == nil || esh.executions == nil { return fmt.Errorf("ExecutionStatsHolder is not initialized") } esh.mu.Lock() defer esh.mu.Unlock() - esh.executions[executionId] = executionStats + esh.executions[executionID] = executionStats return nil } @@ -159,10 +159,10 @@ func (esh *ExecutionStatsHolder) RemoveTerminatedExecutions(ctx context.Context, // Acquire the mutex and remove all terminated or deleted workflows. esh.mu.Lock() defer esh.mu.Unlock() - for execId := range esh.executions { - if !workflows[execId] { - logger.Debugf(ctx, "Deleting active execution entry for execId: %s", execId) - delete(esh.executions, execId) + for execID := range esh.executions { + if !workflows[execID] { + logger.Debugf(ctx, "Deleting active execution entry for execId: %s", execID) + delete(esh.executions, execID) } } return nil diff --git a/flytepropeller/pkg/controller/workflowstore/execution_stats_test.go b/flytepropeller/pkg/controller/workflowstore/execution_stats_test.go index cb61b7e86c..4dfff5985e 100644 --- a/flytepropeller/pkg/controller/workflowstore/execution_stats_test.go +++ b/flytepropeller/pkg/controller/workflowstore/execution_stats_test.go @@ -20,10 +20,25 @@ func TestAddOrUpdateEntry(t *testing.T) { assert.Equal(t, uint32(10), esh.executions["exec1"].ActiveTaskCount) } -func TestAggregateActiveValues(t *testing.T) { - esh, _ := NewExecutionStatsHolder() - esh.AddOrUpdateEntry("exec1", SingleExecutionStats{ActiveNodeCount: 5, ActiveTaskCount: 10}) +func createDefaultExecutionStatsHolder() (*ExecutionStatsHolder, error) { + esh, err := NewExecutionStatsHolder() + if err != nil { + return nil, err + } + err = esh.AddOrUpdateEntry("exec1", SingleExecutionStats{ActiveNodeCount: 5, ActiveTaskCount: 10}) + if err != nil { + return nil, err + } esh.AddOrUpdateEntry("exec2", SingleExecutionStats{ActiveNodeCount: 3, ActiveTaskCount: 6}) + if err != nil { + return nil, err + } + return esh, nil +} + +func TestAggregateActiveValues(t *testing.T) { + esh, err := createDefaultExecutionStatsHolder() + assert.NoError(t, err) flows, nodes, tasks, err := esh.AggregateActiveValues() assert.NoError(t, err) @@ -46,12 +61,9 @@ func TestRemoveTerminatedExecutionsEmpty(t *testing.T) { // Test removal of a subset of entries from ExcutionStatsHolder func TestRemoveTerminatedExecutionsSubset(t *testing.T) { - esh, err := NewExecutionStatsHolder() + esh, err := createDefaultExecutionStatsHolder() assert.NoError(t, err) - esh.AddOrUpdateEntry("exec1", SingleExecutionStats{ActiveNodeCount: 5, ActiveTaskCount: 10}) - esh.AddOrUpdateEntry("exec2", SingleExecutionStats{ActiveNodeCount: 3, ActiveTaskCount: 6}) - err = esh.RemoveTerminatedExecutions(context.TODO(), map[string]bool{"exec2": true}) assert.NoError(t, err) @@ -73,8 +85,8 @@ func TestConcurrentAccess(t *testing.T) { wg.Add(1) go func(id int) { defer wg.Done() - execId := fmt.Sprintf("exec%d", id) - esh.AddOrUpdateEntry(execId, SingleExecutionStats{ActiveNodeCount: uint32(id), ActiveTaskCount: uint32(id * 2)}) + execID := fmt.Sprintf("exec%d", id) + esh.AddOrUpdateEntry(execID, SingleExecutionStats{ActiveNodeCount: uint32(id), ActiveTaskCount: uint32(id * 2)}) }(i) }