Skip to content

Commit

Permalink
enhance: Preserve fixed-size memory in delegator node for growing seg…
Browse files Browse the repository at this point in the history
…ment (#34602)

issue: #34595
pr: #34596

When consuming insert data on the delegator node, QueryCoord will move
out some sealed segments to manage its memory usage. After the growing
segment gets flushed, some sealed segments from other workers will be
moved back to the delegator node. To avoid the frequent movement of
segments, we estimate the maximum growing row count and preserve a
fixed-size memory in the delegator node.

---------

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Jul 13, 2024
1 parent b5ba583 commit 79c0c78
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 7 deletions.
10 changes: 8 additions & 2 deletions internal/querycoordv2/balance/score_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,22 @@ func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []in
}

func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int {
delegatorOverloadFactor := params.Params.QueryCoordCfg.DelegatorMemoryOverloadFactor.GetAsFloat()

nodeRowCount := 0
nodeCollectionRowCount := make(map[int64]int)
// calculate global sealed segment row count
globalSegments := b.dist.SegmentDistManager.GetByNode(nodeID)
for _, s := range globalSegments {
nodeRowCount += int(s.GetNumOfRows())
nodeCollectionRowCount[s.CollectionID] += int(s.GetNumOfRows())
}

// calculate global growing segment row count
views := b.dist.GetLeaderView(nodeID)
for _, view := range views {
nodeRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat())
nodeRowCount += int(float64(view.NumOfGrowingRows))
nodeRowCount += int(float64(nodeCollectionRowCount[view.CollectionID]) * delegatorOverloadFactor)
}

// calculate executing task cost in scheduler
Expand All @@ -174,7 +179,8 @@ func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int {
// calculate collection growing segment row count
collectionViews := b.dist.LeaderViewManager.GetByCollectionAndNode(collectionID, nodeID)
for _, view := range collectionViews {
collectionRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat())
collectionRowCount += int(float64(view.NumOfGrowingRows))
collectionRowCount += int(float64(collectionRowCount) * delegatorOverloadFactor)
}

// calculate executing task cost in scheduler
Expand Down
20 changes: 15 additions & 5 deletions internal/querycoordv2/balance/score_based_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,23 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegmentWithGrowing() {
defer suite.TearDownTest()
balancer := suite.balancer

paramtable.Get().Save(paramtable.Get().QueryCoordCfg.DelegatorMemoryOverloadFactor.Key, "0.3")
suite.balancer.meta.PutCollection(&meta.Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: 1,
},
}, &meta.Partition{
PartitionLoadInfo: &querypb.PartitionLoadInfo{
CollectionID: 1,
PartitionID: 1,
},
})
distributions := map[int64][]*meta.Segment{
1: {
{SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 20, CollectionID: 1}, Node: 1},
{SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 100, CollectionID: 1}, Node: 1},
},
2: {
{SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 20, CollectionID: 1}, Node: 2},
{SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 100, CollectionID: 1}, Node: 2},
},
}
for node, s := range distributions {
Expand All @@ -321,9 +332,8 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegmentWithGrowing() {

// mock 50 growing row count in node 1, which is delegator, expect all segment assign to node 2
leaderView := &meta.LeaderView{
ID: 1,
CollectionID: 1,
NumOfGrowingRows: 50,
ID: 1,
CollectionID: 1,
}
suite.balancer.dist.LeaderViewManager.Update(1, leaderView)
plans := balancer.AssignSegment(1, toAssign, lo.Keys(distributions), false)
Expand Down
12 changes: 12 additions & 0 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -1384,6 +1384,8 @@ type queryCoordConfig struct {
BalanceIntervalSeconds ParamItem `refreshable:"true"`
MemoryUsageMaxDifferencePercentage ParamItem `refreshable:"true"`
GrowingRowCountWeight ParamItem `refreshable:"true"`
DelegatorMemoryOverloadFactor ParamItem `refreshable:"true`
BalanceCostThreshold ParamItem `refreshable:"true"`

SegmentCheckInterval ParamItem `refreshable:"true"`
ChannelCheckInterval ParamItem `refreshable:"true"`
Expand Down Expand Up @@ -1555,6 +1557,16 @@ func (p *queryCoordConfig) init(base *BaseTable) {
}
p.GrowingRowCountWeight.Init(base.mgr)

p.DelegatorMemoryOverloadFactor = ParamItem{
Key: "queryCoord.delegatorMemoryOverloadFactor",
Version: "2.3.19",
DefaultValue: "0.3",
PanicIfEmpty: true,
Doc: "the factor of delegator overloaded memory",
Export: true,
}
p.DelegatorMemoryOverloadFactor.Init(base.mgr)

p.MemoryUsageMaxDifferencePercentage = ParamItem{
Key: "queryCoord.memoryUsageMaxDifferencePercentage",
Version: "2.0.0",
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ func TestComponentParam(t *testing.T) {
params.Save("queryCoord.checkExecutedFlagInterval", "200")
assert.Equal(t, 200, Params.CheckExecutedFlagInterval.GetAsInt())
params.Reset("queryCoord.checkExecutedFlagInterval")

assert.Equal(t, 0.3, Params.DelegatorMemoryOverloadFactor.GetAsFloat())
})

t.Run("test queryNodeConfig", func(t *testing.T) {
Expand Down

0 comments on commit 79c0c78

Please sign in to comment.