diff --git a/pkg/schedule/region_scatterer.go b/pkg/schedule/region_scatterer.go index d5e53965ad4..a37cc2475be 100644 --- a/pkg/schedule/region_scatterer.go +++ b/pkg/schedule/region_scatterer.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math" + "strconv" "sync" "time" @@ -99,26 +100,6 @@ func (s *selectedStores) GetGroupDistribution(group string) (map[uint64]uint64, return s.getDistributionByGroupLocked(group) } -// TotalCountByStore counts the total count by store -func (s *selectedStores) TotalCountByStore(storeID uint64) uint64 { - s.mu.RLock() - defer s.mu.RUnlock() - groups := s.groupDistribution.GetAllID() - totalCount := uint64(0) - for _, group := range groups { - storeDistribution, ok := s.getDistributionByGroupLocked(group) - if !ok { - continue - } - count, ok := storeDistribution[storeID] - if !ok { - continue - } - totalCount += count - } - return totalCount -} - // getDistributionByGroupLocked should be called with lock func (s *selectedStores) getDistributionByGroupLocked(group string) (map[uint64]uint64, bool) { if result, ok := s.groupDistribution.Get(group); ok { @@ -327,6 +308,12 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) * selectedStores := make(map[uint64]struct{}, len(region.GetPeers())) // selected StoreID set leaderCandidateStores := make([]uint64, 0, len(region.GetPeers())) // StoreID allowed to become Leader scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) { // peers: StoreID -> Peer + filterLen := len(context.filterFuncs) + 2 + filters := make([]filter.Filter, filterLen) + for i, filterFunc := range context.filterFuncs { + filters[i] = filterFunc() + } + filters[filterLen-2] = filter.NewExcludedFilter(r.name, nil, selectedStores) for _, peer := range peers { if _, ok := selectedStores[peer.GetStoreId()]; ok { if allowLeader(oldFit, peer) { @@ -335,9 +322,14 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) * // It is both sourcePeer and targetPeer itself, no need to select. continue } + sourceStore := r.cluster.GetStore(peer.GetStoreId()) + if sourceStore == nil { + log.Error("failed to get the store", zap.Uint64("store-id", peer.GetStoreId()), errs.ZapError(errs.ErrGetSourceStore)) + continue + } + filters[filterLen-1] = filter.NewPlacementSafeguard(r.name, r.cluster.GetOpts(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, oldFit) for { - candidates := r.selectCandidates(region, oldFit, peer.GetStoreId(), selectedStores, context) - newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context) + newPeer := r.selectNewPeer(context, group, peer, filters) targetPeers[newPeer.GetStoreId()] = newPeer selectedStores[newPeer.GetStoreId()] = struct{}{} // If the selected peer is a peer other than origin peer in this region, @@ -358,7 +350,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) * // FIXME: target leader only considers the ordinary stores, maybe we need to consider the // special engine stores if the engine supports to become a leader. But now there is only // one engine, tiflash, which does not support the leader, so don't consider it for now. - targetLeader := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine) + targetLeader, leaderStorePickedCount := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine) if targetLeader == 0 { scatterSkipNoLeaderCounter.Inc() return nil @@ -393,6 +385,8 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) * if op != nil { scatterSuccessCounter.Inc() r.Put(targetPeers, targetLeader, group) + op.AdditionalInfos["group"] = group + op.AdditionalInfos["leader-picked-count"] = strconv.FormatUint(leaderStorePickedCount, 10) op.SetPriorityLevel(constant.High) } return op @@ -427,26 +421,18 @@ func isSameDistribution(region *core.RegionInfo, targetPeers map[uint64]*metapb. return region.GetLeader().GetStoreId() == targetLeader } -func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, oldFit *placement.RegionFit, sourceStoreID uint64, selectedStores map[uint64]struct{}, context engineContext) []uint64 { - sourceStore := r.cluster.GetStore(sourceStoreID) - if sourceStore == nil { - log.Error("failed to get the store", zap.Uint64("store-id", sourceStoreID), errs.ZapError(errs.ErrGetSourceStore)) - return nil - } - filters := []filter.Filter{ - filter.NewExcludedFilter(r.name, nil, selectedStores), - } - scoreGuard := filter.NewPlacementSafeguard(r.name, r.cluster.GetOpts(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, oldFit) - for _, filterFunc := range context.filterFuncs { - filters = append(filters, filterFunc()) - } - filters = append(filters, scoreGuard) +// selectNewPeer return the new peer which pick the fewest picked count. +// it keeps the origin peer if the origin store's pick count is equal the fewest pick. +// it can be diveded into three steps: +// 1. found the max pick count and the min pick count. +// 2. if max pick count equals min pick count, it means all store picked count are some, return the origin peer. +// 3. otherwise, select the store which pick count is the min pick count and pass all filter. +func (r *RegionScatterer) selectNewPeer(context engineContext, group string, peer *metapb.Peer, filters []filter.Filter) *metapb.Peer { stores := r.cluster.GetStores() - candidates := make([]uint64, 0) maxStoreTotalCount := uint64(0) minStoreTotalCount := uint64(math.MaxUint64) for _, store := range stores { - count := context.selectedPeer.TotalCountByStore(store.GetID()) + count := context.selectedPeer.Get(store.GetID(), group) if count > maxStoreTotalCount { maxStoreTotalCount = count } @@ -454,42 +440,33 @@ func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, oldFit *plac minStoreTotalCount = count } } + + var newPeer *metapb.Peer + minCount := uint64(math.MaxUint64) + originStorePickedCount := uint64(math.MaxUint64) for _, store := range stores { - storeCount := context.selectedPeer.TotalCountByStore(store.GetID()) + storeCount := context.selectedPeer.Get(store.GetID(), group) + if store.GetID() == peer.GetStoreId() { + originStorePickedCount = storeCount + } // If storeCount is equal to the maxStoreTotalCount, we should skip this store as candidate. // If the storeCount are all the same for the whole cluster(maxStoreTotalCount == minStoreTotalCount), any store // could be selected as candidate. if storeCount < maxStoreTotalCount || maxStoreTotalCount == minStoreTotalCount { if filter.Target(r.cluster.GetOpts(), store, filters) { - candidates = append(candidates, store.GetID()) + if storeCount < minCount { + minCount = storeCount + newPeer = &metapb.Peer{ + StoreId: store.GetID(), + Role: peer.GetRole(), + } + } } } } - return candidates -} - -func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceStoreID uint64, candidates []uint64, context engineContext) *metapb.Peer { - if len(candidates) < 1 { + if originStorePickedCount <= minCount { return peer } - var newPeer *metapb.Peer - minCount := uint64(math.MaxUint64) - for _, storeID := range candidates { - count := context.selectedPeer.Get(storeID, group) - if count < minCount { - minCount = count - newPeer = &metapb.Peer{ - StoreId: storeID, - Role: peer.GetRole(), - } - } - } - // if the source store have the least count, we don't need to scatter this peer - for _, storeID := range candidates { - if storeID == sourceStoreID && context.selectedPeer.Get(sourceStoreID, group) <= minCount { - return peer - } - } if newPeer == nil { return peer } @@ -498,11 +475,12 @@ func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceSto // selectAvailableLeaderStore select the target leader store from the candidates. The candidates would be collected by // the existed peers store depended on the leader counts in the group level. Please use this func before scatter spacial engines. -func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, leaderCandidateStores []uint64, context engineContext) uint64 { +func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, + leaderCandidateStores []uint64, context engineContext) (leaderID uint64, leaderStorePickedCount uint64) { sourceStore := r.cluster.GetStore(region.GetLeader().GetStoreId()) if sourceStore == nil { log.Error("failed to get the store", zap.Uint64("store-id", region.GetLeader().GetStoreId()), errs.ZapError(errs.ErrGetSourceStore)) - return 0 + return 0, 0 } minStoreGroupLeader := uint64(math.MaxUint64) id := uint64(0) @@ -517,7 +495,7 @@ func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core. id = storeID } } - return id + return id, minStoreGroupLeader } // Put put the final distribution in the context no matter the operator was created diff --git a/pkg/schedule/region_scatterer_test.go b/pkg/schedule/region_scatterer_test.go index a602598b4b9..dfa484d7bd7 100644 --- a/pkg/schedule/region_scatterer_test.go +++ b/pkg/schedule/region_scatterer_test.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "math" - "math/rand" "strconv" "sync" "testing" @@ -532,48 +531,11 @@ func TestSelectedStoreGC(t *testing.T) { re.False(ok) } -// TestRegionFromDifferentGroups test the multi regions. each region have its own group. -// After scatter, the distribution for the whole cluster should be well. -func TestRegionFromDifferentGroups(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - opt := mockconfig.NewTestOptions() - tc := mockcluster.NewCluster(ctx, opt) - stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := NewOperatorController(ctx, tc, stream) - // Add 6 stores. - storeCount := 6 - for i := uint64(1); i <= uint64(storeCount); i++ { - tc.AddRegionStore(i, 0) - } - scatterer := NewRegionScatterer(ctx, tc, oc) - regionCount := 50 - for i := 1; i <= regionCount; i++ { - p := rand.Perm(storeCount) - scatterer.scatterRegion(tc.AddLeaderRegion(uint64(i), uint64(p[0])+1, uint64(p[1])+1, uint64(p[2])+1), fmt.Sprintf("t%d", i)) - } - check := func(ss *selectedStores) { - max := uint64(0) - min := uint64(math.MaxUint64) - for i := uint64(1); i <= uint64(storeCount); i++ { - count := ss.TotalCountByStore(i) - if count > max { - max = count - } - if count < min { - min = count - } - } - re.LessOrEqual(max-min, uint64(2)) - } - check(scatterer.ordinaryEngine.selectedPeer) -} - func TestRegionHasLearner(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + group := "group" opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) @@ -616,14 +578,14 @@ func TestRegionHasLearner(t *testing.T) { scatterer := NewRegionScatterer(ctx, tc, oc) regionCount := 50 for i := 1; i <= regionCount; i++ { - _, err := scatterer.Scatter(tc.AddRegionWithLearner(uint64(i), uint64(1), []uint64{uint64(2), uint64(3)}, []uint64{7}), "group") + _, err := scatterer.Scatter(tc.AddRegionWithLearner(uint64(i), uint64(1), []uint64{uint64(2), uint64(3)}, []uint64{7}), group) re.NoError(err) } check := func(ss *selectedStores) { max := uint64(0) min := uint64(math.MaxUint64) for i := uint64(1); i <= max; i++ { - count := ss.TotalCountByStore(i) + count := ss.Get(i, group) if count > max { max = count } @@ -638,7 +600,7 @@ func TestRegionHasLearner(t *testing.T) { max := uint64(0) min := uint64(math.MaxUint64) for i := uint64(1); i <= voterCount; i++ { - count := ss.TotalCountByStore(i) + count := ss.Get(i, group) if count > max { max = count } @@ -649,7 +611,7 @@ func TestRegionHasLearner(t *testing.T) { re.LessOrEqual(max-2, uint64(regionCount)/voterCount) re.LessOrEqual(min-1, uint64(regionCount)/voterCount) for i := voterCount + 1; i <= storeCount; i++ { - count := ss.TotalCountByStore(i) + count := ss.Get(i, group) re.LessOrEqual(count, uint64(0)) } } @@ -690,6 +652,9 @@ func TestSelectedStoresTooFewPeers(t *testing.T) { region := tc.AddLeaderRegion(i+200, i%3+2, (i+1)%3+2, (i+2)%3+2) op := scatterer.scatterRegion(region, group) re.False(isPeerCountChanged(op)) + if op != nil { + re.Equal(group, op.AdditionalInfos["group"]) + } } } @@ -730,6 +695,34 @@ func TestSelectedStoresTooManyPeers(t *testing.T) { } } +// TestBalanceLeader only tests whether region leaders are balanced after scatter. +func TestBalanceLeader(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + opt := mockconfig.NewTestOptions() + tc := mockcluster.NewCluster(ctx, opt) + stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) + oc := NewOperatorController(ctx, tc, stream) + // Add 3 stores + for i := uint64(2); i <= 4; i++ { + tc.AddLabelsStore(i, 0, nil) + // prevent store from being disconnected + tc.SetStoreLastHeartbeatInterval(i, -10*time.Minute) + } + group := "group" + scatterer := NewRegionScatterer(ctx, tc, oc) + for i := uint64(1001); i <= 1300; i++ { + region := tc.AddLeaderRegion(i, 2, 3, 4) + op := scatterer.scatterRegion(region, group) + re.False(isPeerCountChanged(op)) + } + // all leader will be balanced in three stores. + for i := uint64(2); i <= 4; i++ { + re.Equal(uint64(100), scatterer.ordinaryEngine.selectedLeader.Get(i, group)) + } +} + // TestBalanceRegion tests whether region peers and leaders are balanced after scatter. // ref https://github.com/tikv/pd/issues/6017 func TestBalanceRegion(t *testing.T) { @@ -756,7 +749,6 @@ func TestBalanceRegion(t *testing.T) { } for i := uint64(2); i <= 7; i++ { re.Equal(uint64(150), scatterer.ordinaryEngine.selectedPeer.Get(i, group)) - re.Equal(uint64(50), scatterer.ordinaryEngine.selectedLeader.Get(i, group)) } // Test for unhealthy region // ref https://github.com/tikv/pd/issues/6099