Skip to content

Commit

Permalink
cp
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Feb 22, 2024
1 parent eddf85e commit 7dc47e5
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 40 deletions.
14 changes: 13 additions & 1 deletion pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,18 @@ func (r *RegionsInfo) GetClusterNotFromStorageRegionsCnt() int {
return r.tree.notFromStorageRegionsCnt
}

// GetNotFromStorageRegionsCntByStore gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID.
func (r *RegionsInfo) GetNotFromStorageRegionsCntByStore(storeID uint64) int {
r.st.RLock()
defer r.st.RUnlock()
return r.getNotFromStorageRegionsCntByStoreLocked(storeID)
}

// getNotFromStorageRegionsCntByStoreLocked gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID.
func (r *RegionsInfo) getNotFromStorageRegionsCntByStoreLocked(storeID uint64) int {
return r.leaders[storeID].notFromStorageRegionsCount() + r.followers[storeID].notFromStorageRegionsCount() + r.learners[storeID].notFromStorageRegionsCount()
}

// GetMetaRegions gets a set of metapb.Region from regionMap
func (r *RegionsInfo) GetMetaRegions() []*metapb.Region {
r.t.RLock()
Expand Down Expand Up @@ -1329,7 +1341,7 @@ func (r *RegionsInfo) GetStoreRegionCount(storeID uint64) int {
return r.getStoreRegionCountLocked(storeID)
}

// GetStoreRegionCount gets the total count of a store's leader, follower and learner RegionInfo by storeID
// getStoreRegionCountLocked gets the total count of a store's leader, follower and learner RegionInfo by storeID
func (r *RegionsInfo) getStoreRegionCountLocked(storeID uint64) int {
return r.leaders[storeID].length() + r.followers[storeID].length() + r.learners[storeID].length()
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ func (t *regionTree) length() int {
return t.tree.Len()
}

func (t *regionTree) notFromStorageRegionsCount() int {
if t == nil {
return 0
}
return t.notFromStorageRegionsCnt
}

// GetOverlaps returns the range items that has some intersections with the given items.
func (t *regionTree) overlaps(item *regionItem) []*regionItem {
// note that Find() gets the last item that is less or equal than the item.
Expand Down
4 changes: 0 additions & 4 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,10 +1069,6 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
c.regionStats.Observe(region, c.getRegionStoresLocked(region))
}

if !c.IsPrepared() && isNew {
c.coordinator.prepareChecker.collect(region)
}

if c.storage != nil {
// If there are concurrent heartbeats from the same region, the last write will win even if
// writes to storage in the critical area. So don't use mutex to protect it.
Expand Down
18 changes: 8 additions & 10 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (c *testCluster) LoadRegion(regionID uint64, followerStoreIDs ...uint64) er
peer, _ := c.AllocPeer(id)
region.Peers = append(region.Peers, peer)
}
return c.putRegion(core.NewRegionInfo(region, nil))
return c.putRegion(core.NewRegionInfo(region, nil, core.SetSource(core.Storage)))
}

func TestBasic(t *testing.T) {
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestDispatch(t *testing.T) {

func dispatchHeartbeat(co *coordinator, region *core.RegionInfo, stream hbstream.HeartbeatStream) error {
co.hbStreams.BindStream(region.GetLeader().GetStoreId(), stream)
if err := co.cluster.putRegion(region.Clone()); err != nil {
if err := co.cluster.putRegion(region.Clone(core.SetSource(core.Heartbeat))); err != nil {
return err
}
co.opController.Dispatch(region, schedule.DispatchFromHeartBeat)
Expand Down Expand Up @@ -659,14 +659,14 @@ func TestShouldRun(t *testing.T) {

for _, testCase := range testCases {
r := tc.GetRegion(testCase.regionID)
nr := r.Clone(core.WithLeader(r.GetPeers()[0]))
nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat))
re.NoError(tc.processRegionHeartbeat(nr))
re.Equal(testCase.shouldRun, co.shouldRun())
}
nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}}
newRegion := core.NewRegionInfo(nr, nil)
newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat))
re.Error(tc.processRegionHeartbeat(newRegion))
re.Equal(7, co.prepareChecker.sum)
re.Equal(7, tc.core.GetClusterNotFromStorageRegionsCnt())
}

func TestShouldRunWithNonLeaderRegions(t *testing.T) {
Expand Down Expand Up @@ -702,14 +702,14 @@ func TestShouldRunWithNonLeaderRegions(t *testing.T) {

for _, testCase := range testCases {
r := tc.GetRegion(testCase.regionID)
nr := r.Clone(core.WithLeader(r.GetPeers()[0]))
nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat))
re.NoError(tc.processRegionHeartbeat(nr))
re.Equal(testCase.shouldRun, co.shouldRun())
}
nr := &metapb.Region{Id: 9, Peers: []*metapb.Peer{}}
newRegion := core.NewRegionInfo(nr, nil)
newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat))
re.Error(tc.processRegionHeartbeat(newRegion))
re.Equal(9, co.prepareChecker.sum)
re.Equal(9, tc.core.GetClusterNotFromStorageRegionsCnt())

// Now, after server is prepared, there exist some regions with no leader.
re.Equal(uint64(0), tc.GetRegion(10).GetLeader().GetStoreId())
Expand Down Expand Up @@ -1011,7 +1011,6 @@ func TestRestart(t *testing.T) {
re.NoError(tc.addRegionStore(3, 3))
re.NoError(tc.addLeaderRegion(1, 1))
region := tc.GetRegion(1)
co.prepareChecker.collect(region)

// Add 1 replica on store 2.
stream := mockhbstream.NewHeartbeatStream()
Expand All @@ -1024,7 +1023,6 @@ func TestRestart(t *testing.T) {

// Recreate coordinator then add another replica on store 3.
co = newCoordinator(ctx, tc.RaftCluster, hbStreams)
co.prepareChecker.collect(region)
co.run()
re.NoError(dispatchHeartbeat(co, region, stream))
region = waitAddLearner(re, stream, region, 3)
Expand Down
31 changes: 7 additions & 24 deletions server/cluster/prepare_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,23 @@
package cluster

import (
"go.uber.org/zap"
"time"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)

type prepareChecker struct {
syncutil.RWMutex
reactiveRegions map[uint64]int
start time.Time
sum int
prepared bool
start time.Time
prepared bool
}

func newPrepareChecker() *prepareChecker {
return &prepareChecker{
start: time.Now(),
reactiveRegions: make(map[uint64]int),
start: time.Now(),
}
}

Expand All @@ -51,14 +48,8 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool {
}
notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt()
totalRegionsCnt := c.GetRegionCount()
if float64(notLoadedFromRegionsCnt) > float64(totalRegionsCnt)*collectFactor {
log.Info("meta not loaded from region number is satisfied, finish prepare checker",
zap.Int("not-from-storage-region", notLoadedFromRegionsCnt), zap.Int("total-region", totalRegionsCnt))
checker.prepared = true
return true
}
// The number of active regions should be more than total region of all stores * collectFactor
if float64(totalRegionsCnt)*collectFactor > float64(checker.sum) {
if float64(totalRegionsCnt)*collectFactor > float64(notLoadedFromRegionsCnt) {
return false
}
for _, store := range c.GetStores() {
Expand All @@ -67,23 +58,15 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool {
}
storeID := store.GetID()
// For each store, the number of active regions should be more than total region of the store * collectFactor
if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) {
if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID)) {
return false
}
}
log.Info("not loaded from storage region number is satisfied, finish prepare checker", zap.Int("not-from-storage-region", notLoadedFromRegionsCnt), zap.Int("total-region", totalRegionsCnt))
checker.prepared = true
return true
}

func (checker *prepareChecker) collect(region *core.RegionInfo) {
checker.Lock()
defer checker.Unlock()
for _, p := range region.GetPeers() {
checker.reactiveRegions[p.GetStoreId()]++
}
checker.sum++
}

func (checker *prepareChecker) isPrepared() bool {
checker.RLock()
defer checker.RUnlock()
Expand Down
1 change: 1 addition & 0 deletions tests/pdctl/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func MustPutRegion(re *require.Assertions, cluster *tests.TestCluster, regionID,
Peers: []*metapb.Peer{leader},
RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1},
}
opts = append(opts, core.SetSource(core.Heartbeat))
r := core.NewRegionInfo(metaRegion, leader, opts...)
err := cluster.HandleRegionHeartbeat(r)
re.NoError(err)
Expand Down
2 changes: 1 addition & 1 deletion tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1409,7 +1409,7 @@ func putRegionWithLeader(re *require.Assertions, rc *cluster.RaftCluster, id id.
StartKey: []byte{byte(i)},
EndKey: []byte{byte(i + 1)},
}
rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0]))
rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0], core.SetSource(core.Heartbeat)))
}

time.Sleep(50 * time.Millisecond)
Expand Down

0 comments on commit 7dc47e5

Please sign in to comment.