From 21c031b7ee90adbacc432aa1b754fe714438c43d Mon Sep 17 00:00:00 2001 From: Max Holland Date: Mon, 13 Jan 2025 21:16:23 +0000 Subject: [PATCH] Add simulate unit test back in --- .../catabalancer/catalyst_balancer_test.go | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/balancer/catabalancer/catalyst_balancer_test.go b/balancer/catabalancer/catalyst_balancer_test.go index fe5e2c46..eefa1762 100644 --- a/balancer/catabalancer/catalyst_balancer_test.go +++ b/balancer/catabalancer/catalyst_balancer_test.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "encoding/json" + "fmt" "testing" "time" @@ -415,3 +416,41 @@ func TestStreamTimeout(t *testing.T) { nodes = selectTopNodes(c.createScoredNodes(s), "ingest", 0, 0, 1) require.Empty(t, nodes) } + +func TestSimulate(t *testing.T) { + // update these values to test the lock contention with higher numbers of nodes etc + nodeCount := 1 + streamsPerNode := 2 + expectedResponseTime := 100 * time.Millisecond + loadBalanceCallCount := 5 + + db, mock, err := sqlmock.New() + require.NoError(t, err) + c := NewBalancer("node0", time.Second, time.Second, db) + var nodes []cluster.Member + for i := 0; i < nodeCount; i++ { + nodes = append(nodes, cluster.Member{Name: fmt.Sprintf("node%d", i)}) + } + err = c.UpdateMembers(context.Background(), nodes) + require.NoError(t, err) + + var s []NodeUpdateEvent + for i := 0; i < nodeCount; i++ { + nodeStats := NodeUpdateEvent{NodeID: fmt.Sprintf("node%d", i), NodeMetrics: NodeMetrics{Timestamp: time.Now()}} + var streams []string + for k := 0; k < streamsPerNode; k++ { + streams = append(streams, fmt.Sprintf("stream%d", k)) + } + nodeStats.SetStreams(streams, nil) + s = append(s, nodeStats) + } + + for j := 0; j < loadBalanceCallCount; j++ { + setNodeMetrics(t, mock, s) + start := time.Now() + _, _, err = c.GetBestNode(context.Background(), nil, "playbackID", "0", "0", "", false) + require.NoError(t, err) + require.LessOrEqual(t, time.Since(start), expectedResponseTime) + time.Sleep(10 * time.Millisecond) + } +}