diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 874a070fdfc1a..97f65ed08c0f5 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -148,17 +148,22 @@ func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []in } func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int { + delegatorOverloadFactor := params.Params.QueryCoordCfg.DelegatorMemoryOverloadFactor.GetAsFloat() + nodeRowCount := 0 + nodeCollectionRowCount := make(map[int64]int) // calculate global sealed segment row count globalSegments := b.dist.SegmentDistManager.GetByNode(nodeID) for _, s := range globalSegments { nodeRowCount += int(s.GetNumOfRows()) + nodeCollectionRowCount[s.CollectionID] += int(s.GetNumOfRows()) } // calculate global growing segment row count views := b.dist.GetLeaderView(nodeID) for _, view := range views { - nodeRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat()) + nodeRowCount += int(float64(view.NumOfGrowingRows)) + nodeRowCount += int(float64(nodeCollectionRowCount[view.CollectionID]) * delegatorOverloadFactor) } // calculate executing task cost in scheduler @@ -174,7 +179,8 @@ func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int { // calculate collection growing segment row count collectionViews := b.dist.LeaderViewManager.GetByCollectionAndNode(collectionID, nodeID) for _, view := range collectionViews { - collectionRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat()) + collectionRowCount += int(float64(view.NumOfGrowingRows)) + collectionRowCount += int(float64(collectionRowCount) * delegatorOverloadFactor) } // calculate executing task cost in scheduler diff --git a/internal/querycoordv2/balance/score_based_balancer_test.go b/internal/querycoordv2/balance/score_based_balancer_test.go index a1913ee519099..57daa16a1ac30 100644 --- a/internal/querycoordv2/balance/score_based_balancer_test.go +++ b/internal/querycoordv2/balance/score_based_balancer_test.go @@ -295,12 +295,23 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegmentWithGrowing() { defer suite.TearDownTest() balancer := suite.balancer + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.DelegatorMemoryOverloadFactor.Key, "0.3") + suite.balancer.meta.PutCollection(&meta.Collection{ + CollectionLoadInfo: &querypb.CollectionLoadInfo{ + CollectionID: 1, + }, + }, &meta.Partition{ + PartitionLoadInfo: &querypb.PartitionLoadInfo{ + CollectionID: 1, + PartitionID: 1, + }, + }) distributions := map[int64][]*meta.Segment{ 1: { - {SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 20, CollectionID: 1}, Node: 1}, + {SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 100, CollectionID: 1}, Node: 1}, }, 2: { - {SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 20, CollectionID: 1}, Node: 2}, + {SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 100, CollectionID: 1}, Node: 2}, }, } for node, s := range distributions { @@ -321,9 +332,8 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegmentWithGrowing() { // mock 50 growing row count in node 1, which is delegator, expect all segment assign to node 2 leaderView := &meta.LeaderView{ - ID: 1, - CollectionID: 1, - NumOfGrowingRows: 50, + ID: 1, + CollectionID: 1, } suite.balancer.dist.LeaderViewManager.Update(1, leaderView) plans := balancer.AssignSegment(1, toAssign, lo.Keys(distributions), false) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 708aaf7f8a998..292621cce6c75 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1384,6 +1384,8 @@ type queryCoordConfig struct { BalanceIntervalSeconds ParamItem `refreshable:"true"` MemoryUsageMaxDifferencePercentage ParamItem `refreshable:"true"` GrowingRowCountWeight ParamItem `refreshable:"true"` + DelegatorMemoryOverloadFactor ParamItem `refreshable:"true` + BalanceCostThreshold ParamItem `refreshable:"true"` SegmentCheckInterval ParamItem `refreshable:"true"` ChannelCheckInterval ParamItem `refreshable:"true"` @@ -1555,6 +1557,16 @@ func (p *queryCoordConfig) init(base *BaseTable) { } p.GrowingRowCountWeight.Init(base.mgr) + p.DelegatorMemoryOverloadFactor = ParamItem{ + Key: "queryCoord.delegatorMemoryOverloadFactor", + Version: "2.3.19", + DefaultValue: "0.3", + PanicIfEmpty: true, + Doc: "the factor of delegator overloaded memory", + Export: true, + } + p.DelegatorMemoryOverloadFactor.Init(base.mgr) + p.MemoryUsageMaxDifferencePercentage = ParamItem{ Key: "queryCoord.memoryUsageMaxDifferencePercentage", Version: "2.0.0", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 7d6a36089c9c4..54b52b0e01d7e 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -314,6 +314,8 @@ func TestComponentParam(t *testing.T) { params.Save("queryCoord.checkExecutedFlagInterval", "200") assert.Equal(t, 200, Params.CheckExecutedFlagInterval.GetAsInt()) params.Reset("queryCoord.checkExecutedFlagInterval") + + assert.Equal(t, 0.3, Params.DelegatorMemoryOverloadFactor.GetAsFloat()) }) t.Run("test queryNodeConfig", func(t *testing.T) {