diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 0b3e0351b16..8809a706936 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -59,7 +59,4 @@ func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRe if hasRegionStats { c.GetRegionStats().Observe(region, stores) } - if !isPrepared && isNew { - c.GetCoordinator().GetPrepareChecker().Collect(region) - } } diff --git a/pkg/core/region.go b/pkg/core/region.go index c9daa69c477..b141e8478da 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -1340,11 +1340,23 @@ func (r *RegionsInfo) GetStoreWriteRate(storeID uint64) (bytesRate, keysRate flo return } -// GetClusterNotFromStorageRegionsCnt gets the total count of regions that not loaded from storage anymore +// GetClusterNotFromStorageRegionsCnt gets the `NotFromStorageRegionsCnt` count of regions that not loaded from storage anymore. func (r *RegionsInfo) GetClusterNotFromStorageRegionsCnt() int { r.t.RLock() defer r.t.RUnlock() - return r.tree.notFromStorageRegionsCnt + return r.tree.notFromStorageRegionsCount() +} + +// GetNotFromStorageRegionsCntByStore gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID. +func (r *RegionsInfo) GetNotFromStorageRegionsCntByStore(storeID uint64) int { + r.st.RLock() + defer r.st.RUnlock() + return r.getNotFromStorageRegionsCntByStoreLocked(storeID) +} + +// getNotFromStorageRegionsCntByStoreLocked gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID. +func (r *RegionsInfo) getNotFromStorageRegionsCntByStoreLocked(storeID uint64) int { + return r.leaders[storeID].notFromStorageRegionsCount() + r.followers[storeID].notFromStorageRegionsCount() + r.learners[storeID].notFromStorageRegionsCount() } // GetMetaRegions gets a set of metapb.Region from regionMap @@ -1380,7 +1392,7 @@ func (r *RegionsInfo) GetStoreRegionCount(storeID uint64) int { return r.getStoreRegionCountLocked(storeID) } -// GetStoreRegionCount gets the total count of a store's leader, follower and learner RegionInfo by storeID +// getStoreRegionCountLocked gets the total count of a store's leader, follower and learner RegionInfo by storeID func (r *RegionsInfo) getStoreRegionCountLocked(storeID uint64) int { return r.leaders[storeID].length() + r.followers[storeID].length() + r.learners[storeID].length() } diff --git a/pkg/core/region_tree.go b/pkg/core/region_tree.go index ed3445de6b6..ecc988d97d8 100644 --- a/pkg/core/region_tree.go +++ b/pkg/core/region_tree.go @@ -82,6 +82,13 @@ func (t *regionTree) length() int { return t.tree.Len() } +func (t *regionTree) notFromStorageRegionsCount() int { + if t == nil { + return 0 + } + return t.notFromStorageRegionsCnt +} + // GetOverlaps returns the range items that has some intersections with the given items. func (t *regionTree) overlaps(item *regionItem) []*regionItem { // note that Find() gets the last item that is less or equal than the item. diff --git a/pkg/schedule/prepare_checker.go b/pkg/schedule/prepare_checker.go index c7faa57af81..34618427930 100644 --- a/pkg/schedule/prepare_checker.go +++ b/pkg/schedule/prepare_checker.go @@ -25,16 +25,13 @@ import ( type prepareChecker struct { syncutil.RWMutex - reactiveRegions map[uint64]int - start time.Time - sum int - prepared bool + start time.Time + prepared bool } func newPrepareChecker() *prepareChecker { return &prepareChecker{ - start: time.Now(), - reactiveRegions: make(map[uint64]int), + start: time.Now(), } } @@ -51,13 +48,8 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool { } notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt() totalRegionsCnt := c.GetTotalRegionCount() - if float64(notLoadedFromRegionsCnt) > float64(totalRegionsCnt)*collectFactor { - log.Info("meta not loaded from region number is satisfied, finish prepare checker", zap.Int("not-from-storage-region", notLoadedFromRegionsCnt), zap.Int("total-region", totalRegionsCnt)) - checker.prepared = true - return true - } // The number of active regions should be more than total region of all stores * collectFactor - if float64(totalRegionsCnt)*collectFactor > float64(checker.sum) { + if float64(totalRegionsCnt)*collectFactor > float64(notLoadedFromRegionsCnt) { return false } for _, store := range c.GetStores() { @@ -66,23 +58,15 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool { } storeID := store.GetID() // For each store, the number of active regions should be more than total region of the store * collectFactor - if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) { + if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID)) { return false } } + log.Info("not loaded from storage region number is satisfied, finish prepare checker", zap.Int("not-from-storage-region", notLoadedFromRegionsCnt), zap.Int("total-region", totalRegionsCnt)) checker.prepared = true return true } -func (checker *prepareChecker) Collect(region *core.RegionInfo) { - checker.Lock() - defer checker.Unlock() - for _, p := range region.GetPeers() { - checker.reactiveRegions[p.GetStoreId()]++ - } - checker.sum++ -} - func (checker *prepareChecker) IsPrepared() bool { checker.RLock() defer checker.RUnlock() @@ -95,10 +79,3 @@ func (checker *prepareChecker) SetPrepared() { defer checker.Unlock() checker.prepared = true } - -// for test purpose -func (checker *prepareChecker) GetSum() int { - checker.RLock() - defer checker.RUnlock() - return checker.sum -} diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index c0fb1b15f8f..3b50ae16d9b 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1018,7 +1018,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, // check its validation again here. // - // However it can't solve the race condition of concurrent heartbeats from the same region. + // However, it can't solve the race condition of concurrent heartbeats from the same region. if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil { return err } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 7ebd012a6a2..4b9b401e0c9 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2384,7 +2384,7 @@ func (c *testCluster) LoadRegion(regionID uint64, followerStoreIDs ...uint64) er peer, _ := c.AllocPeer(id) region.Peers = append(region.Peers, peer) } - return c.putRegion(core.NewRegionInfo(region, nil)) + return c.putRegion(core.NewRegionInfo(region, nil, core.SetSource(core.Storage))) } func TestBasic(t *testing.T) { @@ -2469,7 +2469,7 @@ func TestDispatch(t *testing.T) { func dispatchHeartbeat(co *schedule.Coordinator, region *core.RegionInfo, stream hbstream.HeartbeatStream) error { co.GetHeartbeatStreams().BindStream(region.GetLeader().GetStoreId(), stream) - if err := co.GetCluster().(*RaftCluster).putRegion(region.Clone()); err != nil { + if err := co.GetCluster().(*RaftCluster).putRegion(region.Clone(core.SetSource(core.Heartbeat))); err != nil { return err } co.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, nil) @@ -2943,14 +2943,14 @@ func TestShouldRun(t *testing.T) { for _, testCase := range testCases { r := tc.GetRegion(testCase.regionID) - nr := r.Clone(core.WithLeader(r.GetPeers()[0])) + nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat)) re.NoError(tc.processRegionHeartbeat(nr)) re.Equal(testCase.ShouldRun, co.ShouldRun()) } nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}} - newRegion := core.NewRegionInfo(nr, nil) + newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) re.Error(tc.processRegionHeartbeat(newRegion)) - re.Equal(7, co.GetPrepareChecker().GetSum()) + re.Equal(7, tc.core.GetClusterNotFromStorageRegionsCnt()) } func TestShouldRunWithNonLeaderRegions(t *testing.T) { @@ -2986,14 +2986,14 @@ func TestShouldRunWithNonLeaderRegions(t *testing.T) { for _, testCase := range testCases { r := tc.GetRegion(testCase.regionID) - nr := r.Clone(core.WithLeader(r.GetPeers()[0])) + nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat)) re.NoError(tc.processRegionHeartbeat(nr)) re.Equal(testCase.ShouldRun, co.ShouldRun()) } nr := &metapb.Region{Id: 9, Peers: []*metapb.Peer{}} - newRegion := core.NewRegionInfo(nr, nil) + newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) re.Error(tc.processRegionHeartbeat(newRegion)) - re.Equal(9, co.GetPrepareChecker().GetSum()) + re.Equal(9, tc.core.GetClusterNotFromStorageRegionsCnt()) // Now, after server is prepared, there exist some regions with no leader. re.Equal(uint64(0), tc.GetRegion(10).GetLeader().GetStoreId()) @@ -3263,7 +3263,6 @@ func TestRestart(t *testing.T) { re.NoError(tc.addRegionStore(3, 3)) re.NoError(tc.addLeaderRegion(1, 1)) region := tc.GetRegion(1) - co.GetPrepareChecker().Collect(region) // Add 1 replica on store 2. stream := mockhbstream.NewHeartbeatStream() @@ -3277,7 +3276,6 @@ func TestRestart(t *testing.T) { // Recreate coordinator then add another replica on store 3. co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) - co.GetPrepareChecker().Collect(region) co.Run() re.NoError(dispatchHeartbeat(co, region, stream)) region = waitAddLearner(re, stream, region, 3) diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index cd599405124..d0fac2c1137 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -539,7 +539,7 @@ func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *tests.TestClu tests.MustPutStore(re, cluster, store) } - // note: because pdqsort is a unstable sort algorithm, set ApproximateSize for this region. + // note: because pdqsort is an unstable sort algorithm, set ApproximateSize for this region. tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) echo := mustExec(re, cmd, []string{"-u", pdAddr, "config", "set", "enable-diagnostic", "true"}, nil) diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index ebf0a4e574d..b7a428e3683 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -1402,7 +1402,7 @@ func putRegionWithLeader(re *require.Assertions, rc *cluster.RaftCluster, id id. StartKey: []byte{byte(i)}, EndKey: []byte{byte(i + 1)}, } - rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0])) + rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0], core.SetSource(core.Heartbeat))) } time.Sleep(50 * time.Millisecond) diff --git a/tests/testutil.go b/tests/testutil.go index 613705d3eb6..059a152f06f 100644 --- a/tests/testutil.go +++ b/tests/testutil.go @@ -197,6 +197,7 @@ func MustPutRegion(re *require.Assertions, cluster *TestCluster, regionID, store Peers: []*metapb.Peer{leader}, RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, } + opts = append(opts, core.SetSource(core.Heartbeat)) r := core.NewRegionInfo(metaRegion, leader, opts...) MustPutRegionInfo(re, cluster, r) return r