Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scatter: make peer scatter logic same with the leader (#6965) #7027

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 45 additions & 67 deletions server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -87,26 +88,6 @@ func (s *selectedStores) GetGroupDistribution(group string) (map[uint64]uint64,
return s.getDistributionByGroupLocked(group)
}

// TotalCountByStore counts the total count by store
func (s *selectedStores) TotalCountByStore(storeID uint64) uint64 {
s.mu.RLock()
defer s.mu.RUnlock()
groups := s.groupDistribution.GetAllID()
totalCount := uint64(0)
for _, group := range groups {
storeDistribution, ok := s.getDistributionByGroupLocked(group)
if !ok {
continue
}
count, ok := storeDistribution[storeID]
if !ok {
continue
}
totalCount += count
}
return totalCount
}

// getDistributionByGroupLocked should be called with lock
func (s *selectedStores) getDistributionByGroupLocked(group string) (map[uint64]uint64, bool) {
if result, ok := s.groupDistribution.Get(group); ok {
Expand Down Expand Up @@ -315,6 +296,12 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
selectedStores := make(map[uint64]struct{}, len(region.GetPeers())) // selected StoreID set
leaderCandidateStores := make([]uint64, 0, len(region.GetPeers())) // StoreID allowed to become Leader
scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) { // peers: StoreID -> Peer
filterLen := len(context.filterFuncs) + 2
filters := make([]filter.Filter, filterLen)
for i, filterFunc := range context.filterFuncs {
filters[i] = filterFunc()
}
filters[filterLen-2] = filter.NewExcludedFilter(r.name, nil, selectedStores)
for _, peer := range peers {
if _, ok := selectedStores[peer.GetStoreId()]; ok {
if allowLeader(oldFit, peer) {
Expand All @@ -323,9 +310,14 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
// It is both sourcePeer and targetPeer itself, no need to select.
continue
}
sourceStore := r.cluster.GetStore(peer.GetStoreId())
if sourceStore == nil {
log.Error("failed to get the store", zap.Uint64("store-id", peer.GetStoreId()), errs.ZapError(errs.ErrGetSourceStore))
continue
}
filters[filterLen-1] = filter.NewPlacementSafeguard(r.name, r.cluster.GetOpts(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, oldFit)
for {
candidates := r.selectCandidates(region, oldFit, peer.GetStoreId(), selectedStores, context)
newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context)
newPeer := r.selectNewPeer(context, group, peer, filters)
targetPeers[newPeer.GetStoreId()] = newPeer
selectedStores[newPeer.GetStoreId()] = struct{}{}
// If the selected peer is a peer other than origin peer in this region,
Expand All @@ -346,7 +338,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
// FIXME: target leader only considers the ordinary stores, maybe we need to consider the
// special engine stores if the engine supports to become a leader. But now there is only
// one engine, tiflash, which does not support the leader, so don't consider it for now.
targetLeader := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine)
targetLeader, leaderStorePickedCount := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine)
if targetLeader == 0 {
scatterCounter.WithLabelValues("no-leader", "").Inc()
return nil
Expand Down Expand Up @@ -381,6 +373,8 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
if op != nil {
scatterCounter.WithLabelValues("success", "").Inc()
r.Put(targetPeers, targetLeader, group)
op.AdditionalInfos["group"] = group
op.AdditionalInfos["leader-picked-count"] = strconv.FormatUint(leaderStorePickedCount, 10)
op.SetPriorityLevel(core.High)
}
return op
Expand Down Expand Up @@ -418,69 +412,52 @@ func isSameDistribution(region *core.RegionInfo, targetPeers map[uint64]*metapb.
return region.GetLeader().GetStoreId() == targetLeader
}

func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, oldFit *placement.RegionFit, sourceStoreID uint64, selectedStores map[uint64]struct{}, context engineContext) []uint64 {
sourceStore := r.cluster.GetStore(sourceStoreID)
if sourceStore == nil {
log.Error("failed to get the store", zap.Uint64("store-id", sourceStoreID), errs.ZapError(errs.ErrGetSourceStore))
return nil
}
filters := []filter.Filter{
filter.NewExcludedFilter(r.name, nil, selectedStores),
}
scoreGuard := filter.NewPlacementSafeguard(r.name, r.cluster.GetOpts(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, oldFit)
for _, filterFunc := range context.filterFuncs {
filters = append(filters, filterFunc())
}
filters = append(filters, scoreGuard)
// selectNewPeer return the new peer which pick the fewest picked count.
// it keeps the origin peer if the origin store's pick count is equal the fewest pick.
// it can be diveded into three steps:
// 1. found the max pick count and the min pick count.
// 2. if max pick count equals min pick count, it means all store picked count are some, return the origin peer.
// 3. otherwise, select the store which pick count is the min pick count and pass all filter.
func (r *RegionScatterer) selectNewPeer(context engineContext, group string, peer *metapb.Peer, filters []filter.Filter) *metapb.Peer {
stores := r.cluster.GetStores()
candidates := make([]uint64, 0)
maxStoreTotalCount := uint64(0)
minStoreTotalCount := uint64(math.MaxUint64)
for _, store := range stores {
count := context.selectedPeer.TotalCountByStore(store.GetID())
count := context.selectedPeer.Get(store.GetID(), group)
if count > maxStoreTotalCount {
maxStoreTotalCount = count
}
if count < minStoreTotalCount {
minStoreTotalCount = count
}
}

var newPeer *metapb.Peer
minCount := uint64(math.MaxUint64)
originStorePickedCount := uint64(math.MaxUint64)
for _, store := range stores {
storeCount := context.selectedPeer.TotalCountByStore(store.GetID())
storeCount := context.selectedPeer.Get(store.GetID(), group)
if store.GetID() == peer.GetId() {
originStorePickedCount = storeCount
}
// If storeCount is equal to the maxStoreTotalCount, we should skip this store as candidate.
// If the storeCount are all the same for the whole cluster(maxStoreTotalCount == minStoreTotalCount), any store
// could be selected as candidate.
if storeCount < maxStoreTotalCount || maxStoreTotalCount == minStoreTotalCount {
if filter.Target(r.cluster.GetOpts(), store, filters) {
candidates = append(candidates, store.GetID())
if storeCount < minCount {
minCount = storeCount
newPeer = &metapb.Peer{
StoreId: store.GetID(),
Role: peer.GetRole(),
}
}
}
}
}
return candidates
}

func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceStoreID uint64, candidates []uint64, context engineContext) *metapb.Peer {
if len(candidates) < 1 {
if originStorePickedCount <= minCount {
return peer
}
var newPeer *metapb.Peer
minCount := uint64(math.MaxUint64)
for _, storeID := range candidates {
count := context.selectedPeer.Get(storeID, group)
if count < minCount {
minCount = count
newPeer = &metapb.Peer{
StoreId: storeID,
Role: peer.GetRole(),
}
}
}
// if the source store have the least count, we don't need to scatter this peer
for _, storeID := range candidates {
if storeID == sourceStoreID && context.selectedPeer.Get(sourceStoreID, group) <= minCount {
return peer
}
}
if newPeer == nil {
return peer
}
Expand All @@ -489,11 +466,12 @@ func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceSto

// selectAvailableLeaderStore select the target leader store from the candidates. The candidates would be collected by
// the existed peers store depended on the leader counts in the group level. Please use this func before scatter spacial engines.
func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, leaderCandidateStores []uint64, context engineContext) uint64 {
func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo,
leaderCandidateStores []uint64, context engineContext) (leaderID uint64, leaderStorePickedCount uint64) {
sourceStore := r.cluster.GetStore(region.GetLeader().GetStoreId())
if sourceStore == nil {
log.Error("failed to get the store", zap.Uint64("store-id", region.GetLeader().GetStoreId()), errs.ZapError(errs.ErrGetSourceStore))
return 0
return 0, 0
}
minStoreGroupLeader := uint64(math.MaxUint64)
id := uint64(0)
Expand All @@ -508,7 +486,7 @@ func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.
id = storeID
}
}
return id
return id, minStoreGroupLeader
}

// Put put the final distribution in the context no matter the operator was created
Expand Down
79 changes: 36 additions & 43 deletions server/schedule/region_scatterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"math"
"math/rand"
"strconv"
"sync"
"testing"
Expand Down Expand Up @@ -532,48 +531,11 @@ func TestSelectedStoreGC(t *testing.T) {
re.False(ok)
}

// TestRegionFromDifferentGroups test the multi regions. each region have its own group.
// After scatter, the distribution for the whole cluster should be well.
func TestRegionFromDifferentGroups(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)
stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false)
oc := NewOperatorController(ctx, tc, stream)
// Add 6 stores.
storeCount := 6
for i := uint64(1); i <= uint64(storeCount); i++ {
tc.AddRegionStore(i, 0)
}
scatterer := NewRegionScatterer(ctx, tc, oc)
regionCount := 50
for i := 1; i <= regionCount; i++ {
p := rand.Perm(storeCount)
scatterer.scatterRegion(tc.AddLeaderRegion(uint64(i), uint64(p[0])+1, uint64(p[1])+1, uint64(p[2])+1), fmt.Sprintf("t%d", i))
}
check := func(ss *selectedStores) {
max := uint64(0)
min := uint64(math.MaxUint64)
for i := uint64(1); i <= uint64(storeCount); i++ {
count := ss.TotalCountByStore(i)
if count > max {
max = count
}
if count < min {
min = count
}
}
re.LessOrEqual(max-min, uint64(2))
}
check(scatterer.ordinaryEngine.selectedPeer)
}

func TestRegionHasLearner(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
group := "group"
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)
stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false)
Expand Down Expand Up @@ -616,14 +578,14 @@ func TestRegionHasLearner(t *testing.T) {
scatterer := NewRegionScatterer(ctx, tc, oc)
regionCount := 50
for i := 1; i <= regionCount; i++ {
_, err := scatterer.Scatter(tc.AddRegionWithLearner(uint64(i), uint64(1), []uint64{uint64(2), uint64(3)}, []uint64{7}), "group")
_, err := scatterer.Scatter(tc.AddRegionWithLearner(uint64(i), uint64(1), []uint64{uint64(2), uint64(3)}, []uint64{7}), group)
re.NoError(err)
}
check := func(ss *selectedStores) {
max := uint64(0)
min := uint64(math.MaxUint64)
for i := uint64(1); i <= max; i++ {
count := ss.TotalCountByStore(i)
count := ss.Get(i, group)
if count > max {
max = count
}
Expand All @@ -638,7 +600,7 @@ func TestRegionHasLearner(t *testing.T) {
max := uint64(0)
min := uint64(math.MaxUint64)
for i := uint64(1); i <= voterCount; i++ {
count := ss.TotalCountByStore(i)
count := ss.Get(i, group)
if count > max {
max = count
}
Expand All @@ -649,7 +611,7 @@ func TestRegionHasLearner(t *testing.T) {
re.LessOrEqual(max-2, uint64(regionCount)/voterCount)
re.LessOrEqual(min-1, uint64(regionCount)/voterCount)
for i := voterCount + 1; i <= storeCount; i++ {
count := ss.TotalCountByStore(i)
count := ss.Get(i, group)
re.LessOrEqual(count, uint64(0))
}
}
Expand Down Expand Up @@ -690,6 +652,9 @@ func TestSelectedStoresTooFewPeers(t *testing.T) {
region := tc.AddLeaderRegion(i+200, i%3+2, (i+1)%3+2, (i+2)%3+2)
op := scatterer.scatterRegion(region, group)
re.False(isPeerCountChanged(op))
if op != nil {
re.Equal(group, op.AdditionalInfos["group"])
}
}
}

Expand Down Expand Up @@ -730,6 +695,34 @@ func TestSelectedStoresTooManyPeers(t *testing.T) {
}
}

// TestBalanceLeader only tests whether region leaders are balanced after scatter.
func TestBalanceLeader(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)
stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false)
oc := NewOperatorController(ctx, tc, stream)
// Add 3 stores
for i := uint64(2); i <= 4; i++ {
tc.AddLabelsStore(i, 0, nil)
// prevent store from being disconnected
tc.SetStoreLastHeartbeatInterval(i, -10*time.Minute)
}
group := "group"
scatterer := NewRegionScatterer(ctx, tc, oc)
for i := uint64(1001); i <= 1300; i++ {
region := tc.AddLeaderRegion(i, 2, 3, 4)
op := scatterer.scatterRegion(region, group)
re.False(isPeerCountChanged(op))
}
// all leader will be balanced in three stores.
for i := uint64(2); i <= 4; i++ {
re.Equal(uint64(100), scatterer.ordinaryEngine.selectedLeader.Get(i, group))
}
}

// TestBalanceRegion tests whether region peers and leaders are balanced after scatter.
// ref https://github.com/tikv/pd/issues/6017
func TestBalanceRegion(t *testing.T) {
Expand Down
Loading