From 8f03456f1cd1eefa6560e0440d43902e3212ecd1 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 17 Jul 2023 14:36:45 +0800 Subject: [PATCH] This is an automated cherry-pick of #6804 close tikv/pd#6560 Signed-off-by: ti-chi-bot --- metrics/grafana/pd.json | 7 - server/api/region.go | 99 +- server/api/region_test.go | 23 + server/cluster/cluster.go | 10 + server/cluster/cluster_test.go | 1489 +++++++++++++++++++ server/handler.go | 9 - server/statistics/metrics.go | 9 - server/statistics/region_collection.go | 185 ++- server/statistics/region_collection_test.go | 47 + 9 files changed, 1712 insertions(+), 166 deletions(-) diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index 1a35f91bddf..078e26efcba 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -794,13 +794,6 @@ "intervalFactor": 2, "legendFormat": "{{type}}", "refId": "B" - }, - { - "expr": "pd_regions_offline_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"offline-peer-region-count\", instance=\"$instance\"}", - "format": "time_series", - "intervalFactor": 1, - "legendFormat": "{{type}}", - "refId": "C" } ], "thresholds": [ diff --git a/server/api/region.go b/server/api/region.go index b80795f816c..941f46afcee 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -398,9 +398,16 @@ func (h *regionsHandler) GetStoreRegions(w http.ResponseWriter, r *http.Request) // @Success 200 {object} RegionsInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/check/miss-peer [get] -func (h *regionsHandler) GetMissPeerRegions(w http.ResponseWriter, r *http.Request) { +func (h *regionsHandler) GetMissPeerRegions(w http.ResponseWriter, _ *http.Request) { + h.getRegionsByType(w, statistics.MissPeer) +} + +func (h *regionsHandler) getRegionsByType( + w http.ResponseWriter, + typ statistics.RegionStatisticType, +) { handler := h.svr.GetHandler() - regions, err := handler.GetRegionsByType(statistics.MissPeer) + regions, err := handler.GetRegionsByType(typ) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return @@ -415,15 +422,8 @@ func (h *regionsHandler) GetMissPeerRegions(w http.ResponseWriter, r *http.Reque // @Success 200 {object} RegionsInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/check/extra-peer [get] -func (h *regionsHandler) GetExtraPeerRegions(w http.ResponseWriter, r *http.Request) { - handler := h.svr.GetHandler() - regions, err := handler.GetRegionsByType(statistics.ExtraPeer) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - regionsInfo := convertToAPIRegions(regions) - h.rd.JSON(w, http.StatusOK, regionsInfo) +func (h *regionsHandler) GetExtraPeerRegions(w http.ResponseWriter, _ *http.Request) { + h.getRegionsByType(w, statistics.ExtraPeer) } // @Tags region @@ -432,15 +432,8 @@ func (h *regionsHandler) GetExtraPeerRegions(w http.ResponseWriter, r *http.Requ // @Success 200 {object} RegionsInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/check/pending-peer [get] -func (h *regionsHandler) GetPendingPeerRegions(w http.ResponseWriter, r *http.Request) { - handler := h.svr.GetHandler() - regions, err := handler.GetRegionsByType(statistics.PendingPeer) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - regionsInfo := convertToAPIRegions(regions) - h.rd.JSON(w, http.StatusOK, regionsInfo) +func (h *regionsHandler) GetPendingPeerRegions(w http.ResponseWriter, _ *http.Request) { + h.getRegionsByType(w, statistics.PendingPeer) } // @Tags region @@ -449,15 +442,8 @@ func (h *regionsHandler) GetPendingPeerRegions(w http.ResponseWriter, r *http.Re // @Success 200 {object} RegionsInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/check/down-peer [get] -func (h *regionsHandler) GetDownPeerRegions(w http.ResponseWriter, r *http.Request) { - handler := h.svr.GetHandler() - regions, err := handler.GetRegionsByType(statistics.DownPeer) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - regionsInfo := convertToAPIRegions(regions) - h.rd.JSON(w, http.StatusOK, regionsInfo) +func (h *regionsHandler) GetDownPeerRegions(w http.ResponseWriter, _ *http.Request) { + h.getRegionsByType(w, statistics.DownPeer) } // @Tags region @@ -466,15 +452,8 @@ func (h *regionsHandler) GetDownPeerRegions(w http.ResponseWriter, r *http.Reque // @Success 200 {object} RegionsInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/check/learner-peer [get] -func (h *regionsHandler) GetLearnerPeerRegions(w http.ResponseWriter, r *http.Request) { - handler := h.svr.GetHandler() - regions, err := handler.GetRegionsByType(statistics.LearnerPeer) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - regionsInfo := convertToAPIRegions(regions) - h.rd.JSON(w, http.StatusOK, regionsInfo) +func (h *regionsHandler) GetLearnerPeerRegions(w http.ResponseWriter, _ *http.Request) { + h.getRegionsByType(w, statistics.LearnerPeer) } // @Tags region @@ -483,15 +462,8 @@ func (h *regionsHandler) GetLearnerPeerRegions(w http.ResponseWriter, r *http.Re // @Success 200 {object} RegionsInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/check/offline-peer [get] -func (h *regionsHandler) GetOfflinePeerRegions(w http.ResponseWriter, r *http.Request) { - handler := h.svr.GetHandler() - regions, err := handler.GetOfflinePeer(statistics.OfflinePeer) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - regionsInfo := convertToAPIRegions(regions) - h.rd.JSON(w, http.StatusOK, regionsInfo) +func (h *regionsHandler) GetOfflinePeerRegions(w http.ResponseWriter, _ *http.Request) { + h.getRegionsByType(w, statistics.OfflinePeer) } // @Tags region @@ -500,15 +472,8 @@ func (h *regionsHandler) GetOfflinePeerRegions(w http.ResponseWriter, r *http.Re // @Success 200 {object} RegionsInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/check/oversized-region [get] -func (h *regionsHandler) GetOverSizedRegions(w http.ResponseWriter, r *http.Request) { - handler := h.svr.GetHandler() - regions, err := handler.GetRegionsByType(statistics.OversizedRegion) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - regionsInfo := convertToAPIRegions(regions) - h.rd.JSON(w, http.StatusOK, regionsInfo) +func (h *regionsHandler) GetOverSizedRegions(w http.ResponseWriter, _ *http.Request) { + h.getRegionsByType(w, statistics.OversizedRegion) } // @Tags region @@ -517,15 +482,8 @@ func (h *regionsHandler) GetOverSizedRegions(w http.ResponseWriter, r *http.Requ // @Success 200 {object} RegionsInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/check/undersized-region [get] -func (h *regionsHandler) GetUndersizedRegions(w http.ResponseWriter, r *http.Request) { - handler := h.svr.GetHandler() - regions, err := handler.GetRegionsByType(statistics.UndersizedRegion) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - regionsInfo := convertToAPIRegions(regions) - h.rd.JSON(w, http.StatusOK, regionsInfo) +func (h *regionsHandler) GetUndersizedRegions(w http.ResponseWriter, _ *http.Request) { + h.getRegionsByType(w, statistics.UndersizedRegion) } // @Tags region @@ -534,15 +492,8 @@ func (h *regionsHandler) GetUndersizedRegions(w http.ResponseWriter, r *http.Req // @Success 200 {object} RegionsInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/check/empty-region [get] -func (h *regionsHandler) GetEmptyRegions(w http.ResponseWriter, r *http.Request) { - handler := h.svr.GetHandler() - regions, err := handler.GetRegionsByType(statistics.EmptyRegion) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - regionsInfo := convertToAPIRegions(regions) - h.rd.JSON(w, http.StatusOK, regionsInfo) +func (h *regionsHandler) GetEmptyRegions(w http.ResponseWriter, _ *http.Request) { + h.getRegionsByType(w, statistics.EmptyRegion) } type histItem struct { diff --git a/server/api/region_test.go b/server/api/region_test.go index 7fc80254132..da343b23d36 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -165,9 +165,19 @@ func (s *testRegionSuite) TestRegion(c *C) { func (s *testRegionSuite) TestRegionCheck(c *C) { r := newTestRegionInfo(2, 1, []byte("a"), []byte("b")) downPeer := &metapb.Peer{Id: 13, StoreId: 2} +<<<<<<< HEAD r = r.Clone(core.WithAddPeer(downPeer), core.WithDownPeers([]*pdpb.PeerStats{{Peer: downPeer, DownSeconds: 3600}}), core.WithPendingPeers([]*metapb.Peer{downPeer})) mustRegionHeartbeat(c, s.svr, r) url := fmt.Sprintf("%s/region/id/%d", s.urlPrefix, r.GetID()) +======= + r = r.Clone( + core.WithAddPeer(downPeer), + core.WithDownPeers([]*pdpb.PeerStats{{Peer: downPeer, DownSeconds: 3600}}), + core.WithPendingPeers([]*metapb.Peer{downPeer})) + re := suite.Require() + mustRegionHeartbeat(re, suite.svr, r) + url := fmt.Sprintf("%s/region/id/%d", suite.urlPrefix, r.GetID()) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) r1 := &RegionInfo{} c.Assert(tu.ReadGetJSON(c, testDialClient, url, r1), IsNil) r1.Adjust() @@ -213,7 +223,20 @@ func (s *testRegionSuite) TestRegionCheck(c *C) { r7 := make([]*histItem, 1) c.Assert(tu.ReadGetJSON(c, testDialClient, url, &r7), IsNil) histKeys := []*histItem{{Start: 1000, End: 1999, Count: 1}} +<<<<<<< HEAD c.Assert(r7, DeepEquals, histKeys) +======= + suite.Equal(histKeys, r7) + + mustPutStore(re, suite.svr, 2, metapb.StoreState_Offline, metapb.NodeState_Removing, []*metapb.StoreLabel{}) + mustRegionHeartbeat(re, suite.svr, r) + url = fmt.Sprintf("%s/regions/check/%s", suite.urlPrefix, "offline-peer") + r8 := &RegionsInfo{} + suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r8)) + r4.Adjust() + suite.Equal(1, r8.Count) + suite.Equal(r.GetID(), r8.Regions[0].ID) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) } func (s *testRegionSuite) TestRegions(c *C) { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 24eb2ad7047..d00a01402ff 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -267,8 +267,13 @@ func (c *RaftCluster) Start(s Server) error { return err } c.storeConfigManager = config.NewStoreConfigManager(c.httpClient) +<<<<<<< HEAD c.coordinator = newCoordinator(c.ctx, cluster, s.GetHBStreams()) c.regionStats = statistics.NewRegionStatistics(c.opt, c.ruleManager, c.storeConfigManager) +======= + c.coordinator = schedule.NewCoordinator(c.ctx, cluster, s.GetHBStreams()) + c.regionStats = statistics.NewRegionStatistics(c.core, c.opt, c.ruleManager, c.storeConfigManager) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) c.limiter = NewStoreLimiter(s.GetPersistOptions()) c.wg.Add(8) @@ -1839,6 +1844,7 @@ func (c *RaftCluster) GetRegionStatsByType(typ statistics.RegionStatisticType) [ return c.regionStats.GetRegionStatsByType(typ) } +<<<<<<< HEAD // GetOfflineRegionStatsByType gets the status of the offline region by types. func (c *RaftCluster) GetOfflineRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo { if c.regionStats == nil { @@ -1848,6 +1854,10 @@ func (c *RaftCluster) GetOfflineRegionStatsByType(typ statistics.RegionStatistic } func (c *RaftCluster) updateRegionsLabelLevelStats(regions []*core.RegionInfo) { +======= +// UpdateRegionsLabelLevelStats updates the status of the region label level by types. +func (c *RaftCluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) for _, region := range regions { c.labelLevelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.opt.GetLocationLabels()) } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index cf13b32e46f..f0334c1cd5f 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -862,10 +862,21 @@ func (s *testClusterInfoSuite) TestRegionFlowChanged(c *C) { func (s *testClusterInfoSuite) TestRegionSizeChanged(c *C) { _, opt, err := newTestScheduleConfig() +<<<<<<< HEAD c.Assert(err, IsNil) cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) cluster.coordinator = newCoordinator(s.ctx, cluster, nil) cluster.regionStats = statistics.NewRegionStatistics(cluster.GetOpts(), cluster.ruleManager, cluster.storeConfigManager) +======= + re.NoError(err) + cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) + cluster.regionStats = statistics.NewRegionStatistics( + cluster.GetBasicCluster(), + cluster.GetOpts(), + cluster.ruleManager, + cluster.storeConfigManager) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) region := newTestRegions(1, 3, 3)[0] cluster.opt.GetMaxMergeRegionKeys() curMaxMergeSize := int64(cluster.opt.GetMaxMergeRegionSize()) @@ -1123,8 +1134,17 @@ func (s *testClusterInfoSuite) TestOfflineAndMerge(c *C) { panic(err) } } +<<<<<<< HEAD cluster.regionStats = statistics.NewRegionStatistics(cluster.GetOpts(), cluster.ruleManager, cluster.storeConfigManager) cluster.coordinator = newCoordinator(s.ctx, cluster, nil) +======= + cluster.regionStats = statistics.NewRegionStatistics( + cluster.GetBasicCluster(), + cluster.GetOpts(), + cluster.ruleManager, + cluster.storeConfigManager) + cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) // Put 4 stores. for _, store := range newTestStores(4, "5.0.0") { @@ -1162,14 +1182,24 @@ func (s *testClusterInfoSuite) TestOfflineAndMerge(c *C) { for i := 0; i < n; i++ { regions = core.SplitRegions(regions) } +<<<<<<< HEAD heartbeatRegions(c, cluster, regions) c.Assert(cluster.GetOfflineRegionStatsByType(statistics.OfflinePeer), HasLen, len(regions)) +======= + heartbeatRegions(re, cluster, regions) + re.Len(cluster.GetRegionStatsByType(statistics.OfflinePeer), len(regions)) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) // Merge. for i := 0; i < n; i++ { regions = core.MergeRegions(regions) +<<<<<<< HEAD heartbeatRegions(c, cluster, regions) c.Assert(cluster.GetOfflineRegionStatsByType(statistics.OfflinePeer), HasLen, len(regions)) +======= + heartbeatRegions(re, cluster, regions) + re.Len(cluster.GetRegionStatsByType(statistics.OfflinePeer), len(regions)) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) } } @@ -1338,9 +1368,19 @@ func (s *testClusterInfoSuite) TestCalculateStoreSize1(c *C) { cfg := opt.GetReplicationConfig() cfg.EnablePlacementRules = true opt.SetReplicationConfig(cfg) +<<<<<<< HEAD cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) cluster.coordinator = newCoordinator(s.ctx, cluster, nil) cluster.regionStats = statistics.NewRegionStatistics(cluster.GetOpts(), cluster.ruleManager, cluster.storeConfigManager) +======= + cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) + cluster.regionStats = statistics.NewRegionStatistics( + cluster.GetBasicCluster(), + cluster.GetOpts(), + cluster.ruleManager, + cluster.storeConfigManager) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) // Put 10 stores. for i, store := range newTestStores(10, "6.0.0") { @@ -1417,9 +1457,19 @@ func (s *testClusterInfoSuite) TestCalculateStoreSize2(c *C) { cfg.EnablePlacementRules = true opt.SetReplicationConfig(cfg) opt.SetMaxReplicas(3) +<<<<<<< HEAD cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) cluster.coordinator = newCoordinator(s.ctx, cluster, nil) cluster.regionStats = statistics.NewRegionStatistics(cluster.GetOpts(), cluster.ruleManager, cluster.storeConfigManager) +======= + cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) + cluster.regionStats = statistics.NewRegionStatistics( + cluster.GetBasicCluster(), + cluster.GetOpts(), + cluster.ruleManager, + cluster.storeConfigManager) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) // Put 10 stores. for i, store := range newTestStores(10, "6.0.0") { @@ -1843,3 +1893,1442 @@ func checkStaleRegion(origin *metapb.Region, region *metapb.Region) error { return nil } +<<<<<<< HEAD +======= + +func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind operator.OpKind, steps ...operator.OpStep) *operator.Operator { + return operator.NewTestOperator(regionID, regionEpoch, kind, steps...) +} + +func (c *testCluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { + id, err := c.AllocID() + if err != nil { + return nil, err + } + return &metapb.Peer{Id: id, StoreId: storeID}, nil +} + +func (c *testCluster) addRegionStore(storeID uint64, regionCount int, regionSizes ...uint64) error { + var regionSize uint64 + if len(regionSizes) == 0 { + regionSize = uint64(regionCount) * 10 + } else { + regionSize = regionSizes[0] + } + + stats := &pdpb.StoreStats{} + stats.Capacity = 100 * units.GiB + stats.UsedSize = regionSize * units.MiB + stats.Available = stats.Capacity - stats.UsedSize + newStore := core.NewStoreInfo(&metapb.Store{Id: storeID}, + core.SetStoreStats(stats), + core.SetRegionCount(regionCount), + core.SetRegionSize(int64(regionSize)), + core.SetLastHeartbeatTS(time.Now()), + ) + + c.SetStoreLimit(storeID, storelimit.AddPeer, 60) + c.SetStoreLimit(storeID, storelimit.RemovePeer, 60) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) addLeaderRegion(regionID uint64, leaderStoreID uint64, followerStoreIDs ...uint64) error { + region := newTestRegionMeta(regionID) + leader, _ := c.AllocPeer(leaderStoreID) + region.Peers = []*metapb.Peer{leader} + for _, followerStoreID := range followerStoreIDs { + peer, _ := c.AllocPeer(followerStoreID) + region.Peers = append(region.Peers, peer) + } + regionInfo := core.NewRegionInfo(region, leader, core.SetApproximateSize(10), core.SetApproximateKeys(10)) + return c.putRegion(regionInfo) +} + +func (c *testCluster) updateLeaderCount(storeID uint64, leaderCount int) error { + store := c.GetStore(storeID) + newStore := store.Clone( + core.SetLeaderCount(leaderCount), + core.SetLeaderSize(int64(leaderCount)*10), + ) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) addLeaderStore(storeID uint64, leaderCount int) error { + stats := &pdpb.StoreStats{} + newStore := core.NewStoreInfo(&metapb.Store{Id: storeID}, + core.SetStoreStats(stats), + core.SetLeaderCount(leaderCount), + core.SetLeaderSize(int64(leaderCount)*10), + core.SetLastHeartbeatTS(time.Now()), + ) + + c.SetStoreLimit(storeID, storelimit.AddPeer, 60) + c.SetStoreLimit(storeID, storelimit.RemovePeer, 60) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) setStoreDown(storeID uint64) error { + store := c.GetStore(storeID) + newStore := store.Clone( + core.UpStore(), + core.SetLastHeartbeatTS(typeutil.ZeroTime), + ) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) setStoreOffline(storeID uint64) error { + store := c.GetStore(storeID) + newStore := store.Clone(core.OfflineStore(false)) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) LoadRegion(regionID uint64, followerStoreIDs ...uint64) error { + // regions load from etcd will have no leader + region := newTestRegionMeta(regionID) + region.Peers = []*metapb.Peer{} + for _, id := range followerStoreIDs { + peer, _ := c.AllocPeer(id) + region.Peers = append(region.Peers, peer) + } + return c.putRegion(core.NewRegionInfo(region, nil)) +} + +func TestBasic(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + + re.NoError(tc.addLeaderRegion(1, 1)) + + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader) + oc.AddWaitingOperator(op1) + re.Equal(uint64(1), oc.OperatorCount(operator.OpLeader)) + re.Equal(op1.RegionID(), oc.GetOperator(1).RegionID()) + + // Region 1 already has an operator, cannot add another one. + op2 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion) + oc.AddWaitingOperator(op2) + re.Equal(uint64(0), oc.OperatorCount(operator.OpRegion)) + + // Remove the operator manually, then we can add a new operator. + re.True(oc.RemoveOperator(op1)) + op3 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion) + oc.AddWaitingOperator(op3) + re.Equal(uint64(1), oc.OperatorCount(operator.OpRegion)) + re.Equal(op3.RegionID(), oc.GetOperator(1).RegionID()) +} + +func TestDispatch(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + co.GetPrepareChecker().SetPrepared() + // Transfer peer from store 4 to store 1. + re.NoError(tc.addRegionStore(4, 40)) + re.NoError(tc.addRegionStore(3, 30)) + re.NoError(tc.addRegionStore(2, 20)) + re.NoError(tc.addRegionStore(1, 10)) + re.NoError(tc.addLeaderRegion(1, 2, 3, 4)) + + // Transfer leader from store 4 to store 2. + re.NoError(tc.updateLeaderCount(4, 50)) + re.NoError(tc.updateLeaderCount(3, 50)) + re.NoError(tc.updateLeaderCount(2, 20)) + re.NoError(tc.updateLeaderCount(1, 10)) + re.NoError(tc.addLeaderRegion(2, 4, 3, 2)) + + go co.RunUntilStop() + + // Wait for schedule and turn off balance. + waitOperator(re, co, 1) + sc := co.GetSchedulersController() + operatorutil.CheckTransferPeer(re, co.GetOperatorController().GetOperator(1), operator.OpKind(0), 4, 1) + re.NoError(sc.RemoveScheduler(schedulers.BalanceRegionName)) + waitOperator(re, co, 2) + operatorutil.CheckTransferLeader(re, co.GetOperatorController().GetOperator(2), operator.OpKind(0), 4, 2) + re.NoError(sc.RemoveScheduler(schedulers.BalanceLeaderName)) + + stream := mockhbstream.NewHeartbeatStream() + + // Transfer peer. + region := tc.GetRegion(1).Clone() + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitRemovePeer(re, stream, region, 4) + re.NoError(dispatchHeartbeat(co, region, stream)) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + + // Transfer leader. + region = tc.GetRegion(2).Clone() + re.NoError(dispatchHeartbeat(co, region, stream)) + waitTransferLeader(re, stream, region, 2) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) +} + +func dispatchHeartbeat(co *schedule.Coordinator, region *core.RegionInfo, stream hbstream.HeartbeatStream) error { + co.GetHeartbeatStreams().BindStream(region.GetLeader().GetStoreId(), stream) + if err := co.GetCluster().(*RaftCluster).putRegion(region.Clone()); err != nil { + return err + } + co.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, nil) + return nil +} + +func TestCollectMetricsConcurrent(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, func(tc *testCluster) { + tc.regionStats = statistics.NewRegionStatistics( + tc.GetBasicCluster(), + tc.GetOpts(), + nil, + tc.storeConfigManager) + }, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + + // Make sure there are no problem when concurrent write and read + var wg sync.WaitGroup + count := 10 + wg.Add(count + 1) + for i := 0; i <= count; i++ { + go func(i int) { + defer wg.Done() + for j := 0; j < 1000; j++ { + re.NoError(tc.addRegionStore(uint64(i%5), rand.Intn(200))) + } + }(i) + } + sc := co.GetSchedulersController() + for i := 0; i < 1000; i++ { + co.CollectHotSpotMetrics() + sc.CollectSchedulerMetrics() + co.GetCluster().(*RaftCluster).collectClusterMetrics() + } + co.ResetHotSpotMetrics() + sc.ResetSchedulerMetrics() + co.GetCluster().(*RaftCluster).resetClusterMetrics() + wg.Wait() +} + +func TestCollectMetrics(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, func(tc *testCluster) { + tc.regionStats = statistics.NewRegionStatistics( + tc.GetBasicCluster(), + tc.GetOpts(), + nil, + tc.storeConfigManager) + }, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + count := 10 + for i := 0; i <= count; i++ { + for k := 0; k < 200; k++ { + item := &statistics.HotPeerStat{ + StoreID: uint64(i % 5), + RegionID: uint64(i*1000 + k), + Loads: []float64{10, 20, 30}, + HotDegree: 10, + AntiCount: statistics.HotRegionAntiCount, // for write + } + tc.hotStat.HotCache.Update(item, statistics.Write) + } + } + sc := co.GetSchedulersController() + for i := 0; i < 1000; i++ { + co.CollectHotSpotMetrics() + sc.CollectSchedulerMetrics() + co.GetCluster().(*RaftCluster).collectClusterMetrics() + } + stores := co.GetCluster().GetStores() + regionStats := co.GetCluster().RegionWriteStats() + status1 := statistics.CollectHotPeerInfos(stores, regionStats) + status2 := statistics.GetHotStatus(stores, co.GetCluster().GetStoresLoads(), regionStats, statistics.Write, co.GetCluster().GetSchedulerConfig().IsTraceRegionFlow()) + for _, s := range status2.AsLeader { + s.Stats = nil + } + for _, s := range status2.AsPeer { + s.Stats = nil + } + re.Equal(status1, status2) + co.ResetHotSpotMetrics() + sc.ResetSchedulerMetrics() + co.GetCluster().(*RaftCluster).resetClusterMetrics() +} + +func prepare(setCfg func(*config.ScheduleConfig), setTc func(*testCluster), run func(*schedule.Coordinator), re *require.Assertions) (*testCluster, *schedule.Coordinator, func()) { + ctx, cancel := context.WithCancel(context.Background()) + cfg, opt, err := newTestScheduleConfig() + re.NoError(err) + if setCfg != nil { + setCfg(cfg) + } + tc := newTestCluster(ctx, opt) + hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.meta.GetId(), tc, true /* need to run */) + if setTc != nil { + setTc(tc) + } + co := schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + if run != nil { + run(co) + } + return tc, co, func() { + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + hbStreams.Close() + cancel() + } +} + +func checkRegionAndOperator(re *require.Assertions, tc *testCluster, co *schedule.Coordinator, regionID uint64, expectAddOperator int) { + ops := co.GetCheckerController().CheckRegion(tc.GetRegion(regionID)) + if ops == nil { + re.Equal(0, expectAddOperator) + } else { + re.Equal(expectAddOperator, co.GetOperatorController().AddWaitingOperator(ops...)) + } +} + +func TestCheckRegion(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, co, cleanup := prepare(nil, nil, nil, re) + hbStreams, opt := co.GetHeartbeatStreams(), tc.opt + defer cleanup() + + re.NoError(tc.addRegionStore(4, 4)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.addLeaderRegion(1, 2, 3)) + checkRegionAndOperator(re, tc, co, 1, 1) + operatorutil.CheckAddPeer(re, co.GetOperatorController().GetOperator(1), operator.OpReplica, 1) + checkRegionAndOperator(re, tc, co, 1, 0) + + r := tc.GetRegion(1) + p := &metapb.Peer{Id: 1, StoreId: 1, Role: metapb.PeerRole_Learner} + r = r.Clone( + core.WithAddPeer(p), + core.WithPendingPeers(append(r.GetPendingPeers(), p)), + ) + re.NoError(tc.putRegion(r)) + checkRegionAndOperator(re, tc, co, 1, 0) + + tc = newTestCluster(ctx, opt) + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + + re.NoError(tc.addRegionStore(4, 4)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.putRegion(r)) + checkRegionAndOperator(re, tc, co, 1, 0) + r = r.Clone(core.WithPendingPeers(nil)) + re.NoError(tc.putRegion(r)) + checkRegionAndOperator(re, tc, co, 1, 1) + op := co.GetOperatorController().GetOperator(1) + re.Equal(1, op.Len()) + re.Equal(uint64(1), op.Step(0).(operator.PromoteLearner).ToStore) + checkRegionAndOperator(re, tc, co, 1, 0) +} + +func TestCheckRegionWithScheduleDeny(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + + re.NoError(tc.addRegionStore(4, 4)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.addLeaderRegion(1, 2, 3)) + region := tc.GetRegion(1) + re.NotNil(region) + // test with label schedule=deny + labelerManager := tc.GetRegionLabeler() + labelerManager.SetLabelRule(&labeler.LabelRule{ + ID: "schedulelabel", + Labels: []labeler.RegionLabel{{Key: "schedule", Value: "deny"}}, + RuleType: labeler.KeyRange, + Data: []interface{}{map[string]interface{}{"start_key": "", "end_key": ""}}, + }) + + // should allow to do rule checker + re.True(labelerManager.ScheduleDisabled(region)) + checkRegionAndOperator(re, tc, co, 1, 1) + + // should not allow to merge + tc.opt.SetSplitMergeInterval(time.Duration(0)) + re.NoError(tc.addLeaderRegion(2, 2, 3, 4)) + re.NoError(tc.addLeaderRegion(3, 2, 3, 4)) + region = tc.GetRegion(2) + re.True(labelerManager.ScheduleDisabled(region)) + checkRegionAndOperator(re, tc, co, 2, 0) + // delete label rule, should allow to do merge + labelerManager.DeleteLabelRule("schedulelabel") + re.False(labelerManager.ScheduleDisabled(region)) + checkRegionAndOperator(re, tc, co, 2, 2) +} + +func TestCheckerIsBusy(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(func(cfg *config.ScheduleConfig) { + cfg.ReplicaScheduleLimit = 0 // ensure replica checker is busy + cfg.MergeScheduleLimit = 10 + }, nil, nil, re) + defer cleanup() + + re.NoError(tc.addRegionStore(1, 0)) + num := 1 + typeutil.MaxUint64(tc.opt.GetReplicaScheduleLimit(), tc.opt.GetMergeScheduleLimit()) + var operatorKinds = []operator.OpKind{ + operator.OpReplica, operator.OpRegion | operator.OpMerge, + } + for i, operatorKind := range operatorKinds { + for j := uint64(0); j < num; j++ { + regionID := j + uint64(i+1)*num + re.NoError(tc.addLeaderRegion(regionID, 1)) + switch operatorKind { + case operator.OpReplica: + op := newTestOperator(regionID, tc.GetRegion(regionID).GetRegionEpoch(), operatorKind) + re.Equal(1, co.GetOperatorController().AddWaitingOperator(op)) + case operator.OpRegion | operator.OpMerge: + if regionID%2 == 1 { + ops, err := operator.CreateMergeRegionOperator("merge-region", co.GetCluster(), tc.GetRegion(regionID), tc.GetRegion(regionID-1), operator.OpMerge) + re.NoError(err) + re.Len(ops, co.GetOperatorController().AddWaitingOperator(ops...)) + } + } + } + } + checkRegionAndOperator(re, tc, co, num, 0) +} + +func TestMergeRegionCancelOneOperator(t *testing.T) { + re := require.New(t) + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + + source := core.NewRegionInfo( + &metapb.Region{ + Id: 1, + StartKey: []byte(""), + EndKey: []byte("a"), + }, + nil, + ) + target := core.NewRegionInfo( + &metapb.Region{ + Id: 2, + StartKey: []byte("a"), + EndKey: []byte("t"), + }, + nil, + ) + re.NoError(tc.putRegion(source)) + re.NoError(tc.putRegion(target)) + + // Cancel source region. + ops, err := operator.CreateMergeRegionOperator("merge-region", tc, source, target, operator.OpMerge) + re.NoError(err) + re.Len(ops, co.GetOperatorController().AddWaitingOperator(ops...)) + // Cancel source operator. + co.GetOperatorController().RemoveOperator(co.GetOperatorController().GetOperator(source.GetID())) + re.Len(co.GetOperatorController().GetOperators(), 0) + + // Cancel target region. + ops, err = operator.CreateMergeRegionOperator("merge-region", tc, source, target, operator.OpMerge) + re.NoError(err) + re.Len(ops, co.GetOperatorController().AddWaitingOperator(ops...)) + // Cancel target operator. + co.GetOperatorController().RemoveOperator(co.GetOperatorController().GetOperator(target.GetID())) + re.Len(co.GetOperatorController().GetOperators(), 0) +} + +func TestReplica(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(func(cfg *config.ScheduleConfig) { + // Turn off balance. + cfg.LeaderScheduleLimit = 0 + cfg.RegionScheduleLimit = 0 + }, nil, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addRegionStore(4, 4)) + + stream := mockhbstream.NewHeartbeatStream() + + // Add peer to store 1. + re.NoError(tc.addLeaderRegion(1, 2, 3)) + region := tc.GetRegion(1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + + // Peer in store 3 is down, remove peer in store 3 and add peer to store 4. + re.NoError(tc.setStoreDown(3)) + downPeer := &pdpb.PeerStats{ + Peer: region.GetStorePeer(3), + DownSeconds: 24 * 60 * 60, + } + region = region.Clone( + core.WithDownPeers(append(region.GetDownPeers(), downPeer)), + ) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 4) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 4) + region = region.Clone(core.WithDownPeers(nil)) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + + // Remove peer from store 4. + re.NoError(tc.addLeaderRegion(2, 1, 2, 3, 4)) + region = tc.GetRegion(2) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitRemovePeer(re, stream, region, 4) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + + // Remove offline peer directly when it's pending. + re.NoError(tc.addLeaderRegion(3, 1, 2, 3)) + re.NoError(tc.setStoreOffline(3)) + region = tc.GetRegion(3) + region = region.Clone(core.WithPendingPeers([]*metapb.Peer{region.GetStorePeer(3)})) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) +} + +func TestCheckCache(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(func(cfg *config.ScheduleConfig) { + // Turn off replica scheduling. + cfg.ReplicaScheduleLimit = 0 + }, nil, nil, re) + defer cleanup() + + re.NoError(tc.addRegionStore(1, 0)) + re.NoError(tc.addRegionStore(2, 0)) + re.NoError(tc.addRegionStore(3, 0)) + + // Add a peer with two replicas. + re.NoError(tc.addLeaderRegion(1, 2, 3)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/break-patrol", `return`)) + + // case 1: operator cannot be created due to replica-schedule-limit restriction + co.GetWaitGroup().Add(1) + co.PatrolRegions() + re.Len(co.GetCheckerController().GetWaitingRegions(), 1) + + // cancel the replica-schedule-limit restriction + cfg := tc.GetScheduleConfig() + cfg.ReplicaScheduleLimit = 10 + tc.SetScheduleConfig(cfg) + co.GetWaitGroup().Add(1) + co.PatrolRegions() + oc := co.GetOperatorController() + re.Len(oc.GetOperators(), 1) + re.Empty(co.GetCheckerController().GetWaitingRegions()) + + // case 2: operator cannot be created due to store limit restriction + oc.RemoveOperator(oc.GetOperator(1)) + tc.SetStoreLimit(1, storelimit.AddPeer, 0) + co.GetWaitGroup().Add(1) + co.PatrolRegions() + re.Len(co.GetCheckerController().GetWaitingRegions(), 1) + + // cancel the store limit restriction + tc.SetStoreLimit(1, storelimit.AddPeer, 10) + time.Sleep(time.Second) + co.GetWaitGroup().Add(1) + co.PatrolRegions() + re.Len(oc.GetOperators(), 1) + re.Empty(co.GetCheckerController().GetWaitingRegions()) + + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/break-patrol")) +} + +func TestPeerState(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + + // Transfer peer from store 4 to store 1. + re.NoError(tc.addRegionStore(1, 10)) + re.NoError(tc.addRegionStore(2, 10)) + re.NoError(tc.addRegionStore(3, 10)) + re.NoError(tc.addRegionStore(4, 40)) + re.NoError(tc.addLeaderRegion(1, 2, 3, 4)) + + stream := mockhbstream.NewHeartbeatStream() + + // Wait for schedule. + waitOperator(re, co, 1) + operatorutil.CheckTransferPeer(re, co.GetOperatorController().GetOperator(1), operator.OpKind(0), 4, 1) + + region := tc.GetRegion(1).Clone() + + // Add new peer. + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 1) + + // If the new peer is pending, the operator will not finish. + region = region.Clone(core.WithPendingPeers(append(region.GetPendingPeers(), region.GetStorePeer(1)))) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + re.NotNil(co.GetOperatorController().GetOperator(region.GetID())) + + // The new peer is not pending now, the operator will finish. + // And we will proceed to remove peer in store 4. + region = region.Clone(core.WithPendingPeers(nil)) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitRemovePeer(re, stream, region, 4) + re.NoError(tc.addLeaderRegion(1, 1, 2, 3)) + region = tc.GetRegion(1).Clone() + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) +} + +func TestShouldRun(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + tc.RaftCluster.coordinator = co + defer cleanup() + + re.NoError(tc.addLeaderStore(1, 5)) + re.NoError(tc.addLeaderStore(2, 2)) + re.NoError(tc.addLeaderStore(3, 0)) + re.NoError(tc.addLeaderStore(4, 0)) + re.NoError(tc.LoadRegion(1, 1, 2, 3)) + re.NoError(tc.LoadRegion(2, 1, 2, 3)) + re.NoError(tc.LoadRegion(3, 1, 2, 3)) + re.NoError(tc.LoadRegion(4, 1, 2, 3)) + re.NoError(tc.LoadRegion(5, 1, 2, 3)) + re.NoError(tc.LoadRegion(6, 2, 1, 4)) + re.NoError(tc.LoadRegion(7, 2, 1, 4)) + re.False(co.ShouldRun()) + re.Equal(2, tc.GetStoreRegionCount(4)) + + testCases := []struct { + regionID uint64 + ShouldRun bool + }{ + {1, false}, + {2, false}, + {3, false}, + {4, false}, + {5, false}, + // store4 needs Collect two region + {6, false}, + {7, true}, + } + + for _, testCase := range testCases { + r := tc.GetRegion(testCase.regionID) + nr := r.Clone(core.WithLeader(r.GetPeers()[0])) + re.NoError(tc.processRegionHeartbeat(nr)) + re.Equal(testCase.ShouldRun, co.ShouldRun()) + } + nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}} + newRegion := core.NewRegionInfo(nr, nil) + re.Error(tc.processRegionHeartbeat(newRegion)) + re.Equal(7, co.GetPrepareChecker().GetSum()) +} + +func TestShouldRunWithNonLeaderRegions(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + tc.RaftCluster.coordinator = co + defer cleanup() + + re.NoError(tc.addLeaderStore(1, 10)) + re.NoError(tc.addLeaderStore(2, 0)) + re.NoError(tc.addLeaderStore(3, 0)) + for i := 0; i < 10; i++ { + re.NoError(tc.LoadRegion(uint64(i+1), 1, 2, 3)) + } + re.False(co.ShouldRun()) + re.Equal(10, tc.GetStoreRegionCount(1)) + + testCases := []struct { + regionID uint64 + ShouldRun bool + }{ + {1, false}, + {2, false}, + {3, false}, + {4, false}, + {5, false}, + {6, false}, + {7, false}, + {8, false}, + {9, true}, + } + + for _, testCase := range testCases { + r := tc.GetRegion(testCase.regionID) + nr := r.Clone(core.WithLeader(r.GetPeers()[0])) + re.NoError(tc.processRegionHeartbeat(nr)) + re.Equal(testCase.ShouldRun, co.ShouldRun()) + } + nr := &metapb.Region{Id: 9, Peers: []*metapb.Peer{}} + newRegion := core.NewRegionInfo(nr, nil) + re.Error(tc.processRegionHeartbeat(newRegion)) + re.Equal(9, co.GetPrepareChecker().GetSum()) + + // Now, after server is prepared, there exist some regions with no leader. + re.Equal(uint64(0), tc.GetRegion(10).GetLeader().GetStoreId()) +} + +func TestAddScheduler(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + sc := co.GetSchedulersController() + re.Len(sc.GetSchedulerNames(), len(config.DefaultSchedulers)) + re.NoError(sc.RemoveScheduler(schedulers.BalanceLeaderName)) + re.NoError(sc.RemoveScheduler(schedulers.BalanceRegionName)) + re.NoError(sc.RemoveScheduler(schedulers.HotRegionName)) + re.NoError(sc.RemoveScheduler(schedulers.BalanceWitnessName)) + re.NoError(sc.RemoveScheduler(schedulers.TransferWitnessLeaderName)) + re.Empty(sc.GetSchedulerNames()) + + stream := mockhbstream.NewHeartbeatStream() + + // Add stores 1,2,3 + re.NoError(tc.addLeaderStore(1, 1)) + re.NoError(tc.addLeaderStore(2, 1)) + re.NoError(tc.addLeaderStore(3, 1)) + // Add regions 1 with leader in store 1 and followers in stores 2,3 + re.NoError(tc.addLeaderRegion(1, 1, 2, 3)) + // Add regions 2 with leader in store 2 and followers in stores 1,3 + re.NoError(tc.addLeaderRegion(2, 2, 1, 3)) + // Add regions 3 with leader in store 3 and followers in stores 1,2 + re.NoError(tc.addLeaderRegion(3, 3, 1, 2)) + + oc := co.GetOperatorController() + + // test ConfigJSONDecoder create + bl, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) + re.NoError(err) + conf, err := bl.EncodeConfig() + re.NoError(err) + data := make(map[string]interface{}) + err = json.Unmarshal(conf, &data) + re.NoError(err) + batch := data["batch"].(float64) + re.Equal(4, int(batch)) + gls, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"0"}), sc.RemoveScheduler) + re.NoError(err) + re.NotNil(sc.AddScheduler(gls)) + re.NotNil(sc.RemoveScheduler(gls.GetName())) + + gls, err = schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), sc.RemoveScheduler) + re.NoError(err) + re.NoError(sc.AddScheduler(gls)) + + hb, err := schedulers.CreateScheduler(schedulers.HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) + re.NoError(err) + conf, err = hb.EncodeConfig() + re.NoError(err) + data = make(map[string]interface{}) + re.NoError(json.Unmarshal(conf, &data)) + re.Contains(data, "enable-for-tiflash") + re.Equal("true", data["enable-for-tiflash"].(string)) + + // Transfer all leaders to store 1. + waitOperator(re, co, 2) + region2 := tc.GetRegion(2) + re.NoError(dispatchHeartbeat(co, region2, stream)) + region2 = waitTransferLeader(re, stream, region2, 1) + re.NoError(dispatchHeartbeat(co, region2, stream)) + waitNoResponse(re, stream) + + waitOperator(re, co, 3) + region3 := tc.GetRegion(3) + re.NoError(dispatchHeartbeat(co, region3, stream)) + region3 = waitTransferLeader(re, stream, region3, 1) + re.NoError(dispatchHeartbeat(co, region3, stream)) + waitNoResponse(re, stream) +} + +func TestPersistScheduler(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) + hbStreams := co.GetHeartbeatStreams() + defer cleanup() + defaultCount := len(config.DefaultSchedulers) + // Add stores 1,2 + re.NoError(tc.addLeaderStore(1, 1)) + re.NoError(tc.addLeaderStore(2, 1)) + + sc := co.GetSchedulersController() + re.Len(sc.GetSchedulerNames(), defaultCount) + oc := co.GetOperatorController() + storage := tc.RaftCluster.storage + + gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), sc.RemoveScheduler) + re.NoError(err) + re.NoError(sc.AddScheduler(gls1, "1")) + evict, err := schedulers.CreateScheduler(schedulers.EvictLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.EvictLeaderType, []string{"2"}), sc.RemoveScheduler) + re.NoError(err) + re.NoError(sc.AddScheduler(evict, "2")) + re.Len(sc.GetSchedulerNames(), defaultCount+2) + sches, _, err := storage.LoadAllScheduleConfig() + re.NoError(err) + re.Len(sches, defaultCount+2) + + // remove 5 schedulers + re.NoError(sc.RemoveScheduler(schedulers.BalanceLeaderName)) + re.NoError(sc.RemoveScheduler(schedulers.BalanceRegionName)) + re.NoError(sc.RemoveScheduler(schedulers.HotRegionName)) + re.NoError(sc.RemoveScheduler(schedulers.BalanceWitnessName)) + re.NoError(sc.RemoveScheduler(schedulers.TransferWitnessLeaderName)) + re.Len(sc.GetSchedulerNames(), defaultCount-3) + re.NoError(co.GetCluster().GetPersistOptions().Persist(storage)) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + // make a new coordinator for testing + // whether the schedulers added or removed in dynamic way are recorded in opt + _, newOpt, err := newTestScheduleConfig() + re.NoError(err) + _, err = schedulers.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedulers.ConfigJSONDecoder([]byte("null"))) + re.NoError(err) + // suppose we add a new default enable scheduler + config.DefaultSchedulers = append(config.DefaultSchedulers, config.SchedulerConfig{Type: "shuffle-region"}) + defer func() { + config.DefaultSchedulers = config.DefaultSchedulers[:len(config.DefaultSchedulers)-1] + }() + re.Len(newOpt.GetSchedulers(), defaultCount) + re.NoError(newOpt.Reload(storage)) + // only remains 3 items with independent config. + sches, _, err = storage.LoadAllScheduleConfig() + re.NoError(err) + re.Len(sches, 3) + + // option have 6 items because the default scheduler do not remove. + re.Len(newOpt.GetSchedulers(), defaultCount+3) + re.NoError(newOpt.Persist(storage)) + tc.RaftCluster.opt = newOpt + + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + co.Run() + sc = co.GetSchedulersController() + re.Len(sc.GetSchedulerNames(), 3) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + // suppose restart PD again + _, newOpt, err = newTestScheduleConfig() + re.NoError(err) + re.NoError(newOpt.Reload(storage)) + tc.RaftCluster.opt = newOpt + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + co.Run() + sc = co.GetSchedulersController() + re.Len(sc.GetSchedulerNames(), 3) + bls, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) + re.NoError(err) + re.NoError(sc.AddScheduler(bls)) + brs, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) + re.NoError(err) + re.NoError(sc.AddScheduler(brs)) + re.Len(sc.GetSchedulerNames(), defaultCount) + + // the scheduler option should contain 6 items + // the `hot scheduler` are disabled + re.Len(co.GetCluster().GetPersistOptions().GetSchedulers(), defaultCount+3) + re.NoError(sc.RemoveScheduler(schedulers.GrantLeaderName)) + // the scheduler that is not enable by default will be completely deleted + re.Len(co.GetCluster().GetPersistOptions().GetSchedulers(), defaultCount+2) + re.Len(sc.GetSchedulerNames(), 4) + re.NoError(co.GetCluster().GetPersistOptions().Persist(co.GetCluster().GetStorage())) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + _, newOpt, err = newTestScheduleConfig() + re.NoError(err) + re.NoError(newOpt.Reload(co.GetCluster().GetStorage())) + tc.RaftCluster.opt = newOpt + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + + co.Run() + sc = co.GetSchedulersController() + re.Len(sc.GetSchedulerNames(), defaultCount-1) + re.NoError(sc.RemoveScheduler(schedulers.EvictLeaderName)) + re.Len(sc.GetSchedulerNames(), defaultCount-2) +} + +func TestRemoveScheduler(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, co, cleanup := prepare(func(cfg *config.ScheduleConfig) { + cfg.ReplicaScheduleLimit = 0 + }, nil, func(co *schedule.Coordinator) { co.Run() }, re) + hbStreams := co.GetHeartbeatStreams() + defer cleanup() + + // Add stores 1,2 + re.NoError(tc.addLeaderStore(1, 1)) + re.NoError(tc.addLeaderStore(2, 1)) + defaultCount := len(config.DefaultSchedulers) + sc := co.GetSchedulersController() + re.Len(sc.GetSchedulerNames(), defaultCount) + oc := co.GetOperatorController() + storage := tc.RaftCluster.storage + + gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), sc.RemoveScheduler) + re.NoError(err) + re.NoError(sc.AddScheduler(gls1, "1")) + re.Len(sc.GetSchedulerNames(), defaultCount+1) + sches, _, err := storage.LoadAllScheduleConfig() + re.NoError(err) + re.Len(sches, defaultCount+1) + + // remove all schedulers + re.NoError(sc.RemoveScheduler(schedulers.BalanceLeaderName)) + re.NoError(sc.RemoveScheduler(schedulers.BalanceRegionName)) + re.NoError(sc.RemoveScheduler(schedulers.HotRegionName)) + re.NoError(sc.RemoveScheduler(schedulers.GrantLeaderName)) + re.NoError(sc.RemoveScheduler(schedulers.BalanceWitnessName)) + re.NoError(sc.RemoveScheduler(schedulers.TransferWitnessLeaderName)) + // all removed + sches, _, err = storage.LoadAllScheduleConfig() + re.NoError(err) + re.Empty(sches) + re.Empty(sc.GetSchedulerNames()) + re.NoError(co.GetCluster().GetPersistOptions().Persist(co.GetCluster().GetStorage())) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + + // suppose restart PD again + _, newOpt, err := newTestScheduleConfig() + re.NoError(err) + re.NoError(newOpt.Reload(tc.storage)) + tc.RaftCluster.opt = newOpt + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + co.Run() + re.Empty(sc.GetSchedulerNames()) + // the option remains default scheduler + re.Len(co.GetCluster().GetPersistOptions().GetSchedulers(), defaultCount) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() +} + +func TestRestart(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, co, cleanup := prepare(func(cfg *config.ScheduleConfig) { + // Turn off balance, we test add replica only. + cfg.LeaderScheduleLimit = 0 + cfg.RegionScheduleLimit = 0 + }, nil, func(co *schedule.Coordinator) { co.Run() }, re) + hbStreams := co.GetHeartbeatStreams() + defer cleanup() + + // Add 3 stores (1, 2, 3) and a region with 1 replica on store 1. + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addLeaderRegion(1, 1)) + region := tc.GetRegion(1) + co.GetPrepareChecker().Collect(region) + + // Add 1 replica on store 2. + stream := mockhbstream.NewHeartbeatStream() + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 2) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 2) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + + // Recreate coordinator then add another replica on store 3. + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + co.GetPrepareChecker().Collect(region) + co.Run() + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 3) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitPromoteLearner(re, stream, region, 3) +} + +func TestPauseScheduler(t *testing.T) { + re := require.New(t) + + _, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + sc := co.GetSchedulersController() + _, err := sc.IsSchedulerAllowed("test") + re.Error(err) + sc.PauseOrResumeScheduler(schedulers.BalanceLeaderName, 60) + paused, _ := sc.IsSchedulerPaused(schedulers.BalanceLeaderName) + re.True(paused) + pausedAt, err := sc.GetPausedSchedulerDelayAt(schedulers.BalanceLeaderName) + re.NoError(err) + resumeAt, err := sc.GetPausedSchedulerDelayUntil(schedulers.BalanceLeaderName) + re.NoError(err) + re.Equal(int64(60), resumeAt-pausedAt) + allowed, _ := sc.IsSchedulerAllowed(schedulers.BalanceLeaderName) + re.False(allowed) +} + +func BenchmarkPatrolRegion(b *testing.B) { + re := require.New(b) + + mergeLimit := uint64(4100) + regionNum := 10000 + + tc, co, cleanup := prepare(func(cfg *config.ScheduleConfig) { + cfg.MergeScheduleLimit = mergeLimit + }, nil, nil, re) + defer cleanup() + + tc.opt.SetSplitMergeInterval(time.Duration(0)) + for i := 1; i < 4; i++ { + if err := tc.addRegionStore(uint64(i), regionNum, 96); err != nil { + return + } + } + for i := 0; i < regionNum; i++ { + if err := tc.addLeaderRegion(uint64(i), 1, 2, 3); err != nil { + return + } + } + + listen := make(chan int) + go func() { + oc := co.GetOperatorController() + listen <- 0 + for { + if oc.OperatorCount(operator.OpMerge) == mergeLimit { + co.Stop() + return + } + } + }() + <-listen + + co.GetWaitGroup().Add(1) + b.ResetTimer() + co.PatrolRegions() +} + +func waitOperator(re *require.Assertions, co *schedule.Coordinator, regionID uint64) { + testutil.Eventually(re, func() bool { + return co.GetOperatorController().GetOperator(regionID) != nil + }) +} + +func TestOperatorCount(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + re.Equal(uint64(0), oc.OperatorCount(operator.OpLeader)) + re.Equal(uint64(0), oc.OperatorCount(operator.OpRegion)) + + re.NoError(tc.addLeaderRegion(1, 1)) + re.NoError(tc.addLeaderRegion(2, 2)) + { + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader) + oc.AddWaitingOperator(op1) + re.Equal(uint64(1), oc.OperatorCount(operator.OpLeader)) // 1:leader + op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpLeader) + oc.AddWaitingOperator(op2) + re.Equal(uint64(2), oc.OperatorCount(operator.OpLeader)) // 1:leader, 2:leader + re.True(oc.RemoveOperator(op1)) + re.Equal(uint64(1), oc.OperatorCount(operator.OpLeader)) // 2:leader + } + + { + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion) + oc.AddWaitingOperator(op1) + re.Equal(uint64(1), oc.OperatorCount(operator.OpRegion)) // 1:region 2:leader + re.Equal(uint64(1), oc.OperatorCount(operator.OpLeader)) + op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpRegion) + op2.SetPriorityLevel(constant.High) + oc.AddWaitingOperator(op2) + re.Equal(uint64(2), oc.OperatorCount(operator.OpRegion)) // 1:region 2:region + re.Equal(uint64(0), oc.OperatorCount(operator.OpLeader)) + } +} + +func TestStoreOverloaded(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + lb, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, tc.storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) + re.NoError(err) + opt := tc.GetOpts() + re.NoError(tc.addRegionStore(4, 100)) + re.NoError(tc.addRegionStore(3, 100)) + re.NoError(tc.addRegionStore(2, 100)) + re.NoError(tc.addRegionStore(1, 10)) + re.NoError(tc.addLeaderRegion(1, 2, 3, 4)) + region := tc.GetRegion(1).Clone(core.SetApproximateSize(60)) + tc.putRegion(region) + start := time.Now() + { + ops, _ := lb.Schedule(tc, false /* dryRun */) + re.Len(ops, 1) + op1 := ops[0] + re.NotNil(op1) + re.True(oc.AddOperator(op1)) + re.True(oc.RemoveOperator(op1)) + } + for { + time.Sleep(time.Millisecond * 10) + ops, _ := lb.Schedule(tc, false /* dryRun */) + if time.Since(start) > time.Second { + break + } + re.Empty(ops) + } + + // reset all stores' limit + // scheduling one time needs 1/10 seconds + opt.SetAllStoresLimit(storelimit.AddPeer, 600) + opt.SetAllStoresLimit(storelimit.RemovePeer, 600) + time.Sleep(time.Second) + for i := 0; i < 10; i++ { + ops, _ := lb.Schedule(tc, false /* dryRun */) + re.Len(ops, 1) + op := ops[0] + re.True(oc.AddOperator(op)) + re.True(oc.RemoveOperator(op)) + } + // sleep 1 seconds to make sure that the token is filled up + time.Sleep(time.Second) + for i := 0; i < 100; i++ { + ops, _ := lb.Schedule(tc, false /* dryRun */) + re.Greater(len(ops), 0) + } +} + +func TestStoreOverloadedWithReplace(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + lb, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, tc.storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) + re.NoError(err) + + re.NoError(tc.addRegionStore(4, 100)) + re.NoError(tc.addRegionStore(3, 100)) + re.NoError(tc.addRegionStore(2, 100)) + re.NoError(tc.addRegionStore(1, 10)) + re.NoError(tc.addLeaderRegion(1, 2, 3, 4)) + re.NoError(tc.addLeaderRegion(2, 1, 3, 4)) + region := tc.GetRegion(1).Clone(core.SetApproximateSize(60)) + tc.putRegion(region) + region = tc.GetRegion(2).Clone(core.SetApproximateSize(60)) + tc.putRegion(region) + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion, operator.AddPeer{ToStore: 1, PeerID: 1}) + re.True(oc.AddOperator(op1)) + op2 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 2}) + op2.SetPriorityLevel(constant.High) + re.True(oc.AddOperator(op2)) + op3 := newTestOperator(1, tc.GetRegion(2).GetRegionEpoch(), operator.OpRegion, operator.AddPeer{ToStore: 1, PeerID: 3}) + re.False(oc.AddOperator(op3)) + ops, _ := lb.Schedule(tc, false /* dryRun */) + re.Empty(ops) + // sleep 2 seconds to make sure that token is filled up + time.Sleep(2 * time.Second) + ops, _ = lb.Schedule(tc, false /* dryRun */) + re.Greater(len(ops), 0) +} + +func TestDownStoreLimit(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + rc := co.GetCheckerController().GetRuleChecker() + + tc.addRegionStore(1, 100) + tc.addRegionStore(2, 100) + tc.addRegionStore(3, 100) + tc.addLeaderRegion(1, 1, 2, 3) + + region := tc.GetRegion(1) + tc.setStoreDown(1) + tc.SetStoreLimit(1, storelimit.RemovePeer, 1) + + region = region.Clone(core.WithDownPeers([]*pdpb.PeerStats{ + { + Peer: region.GetStorePeer(1), + DownSeconds: 24 * 60 * 60, + }, + }), core.SetApproximateSize(1)) + tc.putRegion(region) + for i := uint64(1); i < 20; i++ { + tc.addRegionStore(i+3, 100) + op := rc.Check(region) + re.NotNil(op) + re.True(oc.AddOperator(op)) + oc.RemoveOperator(op) + } + + region = region.Clone(core.SetApproximateSize(100)) + tc.putRegion(region) + for i := uint64(20); i < 25; i++ { + tc.addRegionStore(i+3, 100) + op := rc.Check(region) + re.NotNil(op) + re.True(oc.AddOperator(op)) + oc.RemoveOperator(op) + } +} + +// FIXME: remove after move into schedulers package +type mockLimitScheduler struct { + schedulers.Scheduler + limit uint64 + counter *operator.Controller + kind operator.OpKind +} + +func (s *mockLimitScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { + return s.counter.OperatorCount(s.kind) < s.limit +} + +func TestController(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + + re.NoError(tc.addLeaderRegion(1, 1)) + re.NoError(tc.addLeaderRegion(2, 2)) + scheduler, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) + re.NoError(err) + lb := &mockLimitScheduler{ + Scheduler: scheduler, + counter: oc, + kind: operator.OpLeader, + } + + sc := schedulers.NewScheduleController(tc.ctx, co.GetCluster(), co.GetOperatorController(), lb) + + for i := schedulers.MinScheduleInterval; sc.GetInterval() != schedulers.MaxScheduleInterval; i = sc.GetNextInterval(i) { + re.Equal(i, sc.GetInterval()) + re.Empty(sc.Schedule(false)) + } + // limit = 2 + lb.limit = 2 + // count = 0 + { + re.True(sc.AllowSchedule(false)) + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader) + re.Equal(1, oc.AddWaitingOperator(op1)) + // count = 1 + re.True(sc.AllowSchedule(false)) + op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpLeader) + re.Equal(1, oc.AddWaitingOperator(op2)) + // count = 2 + re.False(sc.AllowSchedule(false)) + re.True(oc.RemoveOperator(op1)) + // count = 1 + re.True(sc.AllowSchedule(false)) + } + + op11 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader) + // add a PriorityKind operator will remove old operator + { + op3 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpHotRegion) + op3.SetPriorityLevel(constant.High) + re.Equal(1, oc.AddWaitingOperator(op11)) + re.False(sc.AllowSchedule(false)) + re.Equal(1, oc.AddWaitingOperator(op3)) + re.True(sc.AllowSchedule(false)) + re.True(oc.RemoveOperator(op3)) + } + + // add a admin operator will remove old operator + { + op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpLeader) + re.Equal(1, oc.AddWaitingOperator(op2)) + re.False(sc.AllowSchedule(false)) + op4 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpAdmin) + op4.SetPriorityLevel(constant.High) + re.Equal(1, oc.AddWaitingOperator(op4)) + re.True(sc.AllowSchedule(false)) + re.True(oc.RemoveOperator(op4)) + } + + // test wrong region id. + { + op5 := newTestOperator(3, &metapb.RegionEpoch{}, operator.OpHotRegion) + re.Equal(0, oc.AddWaitingOperator(op5)) + } + + // test wrong region epoch. + re.True(oc.RemoveOperator(op11)) + epoch := &metapb.RegionEpoch{ + Version: tc.GetRegion(1).GetRegionEpoch().GetVersion() + 1, + ConfVer: tc.GetRegion(1).GetRegionEpoch().GetConfVer(), + } + { + op6 := newTestOperator(1, epoch, operator.OpLeader) + re.Equal(0, oc.AddWaitingOperator(op6)) + } + epoch.Version-- + { + op6 := newTestOperator(1, epoch, operator.OpLeader) + re.Equal(1, oc.AddWaitingOperator(op6)) + re.True(oc.RemoveOperator(op6)) + } +} + +func TestInterval(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + + lb, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, co.GetOperatorController(), storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) + re.NoError(err) + sc := schedulers.NewScheduleController(tc.ctx, co.GetCluster(), co.GetOperatorController(), lb) + + // If no operator for x seconds, the next check should be in x/2 seconds. + idleSeconds := []int{5, 10, 20, 30, 60} + for _, n := range idleSeconds { + sc.SetInterval(schedulers.MinScheduleInterval) + for totalSleep := time.Duration(0); totalSleep <= time.Second*time.Duration(n); totalSleep += sc.GetInterval() { + re.Empty(sc.Schedule(false)) + } + re.Less(sc.GetInterval(), time.Second*time.Duration(n/2)) + } +} + +func waitAddLearner(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { + var res *pdpb.RegionHeartbeatResponse + testutil.Eventually(re, func() bool { + if res = stream.Recv(); res != nil { + return res.GetRegionId() == region.GetID() && + res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_AddLearnerNode && + res.GetChangePeer().GetPeer().GetStoreId() == storeID + } + return false + }) + return region.Clone( + core.WithAddPeer(res.GetChangePeer().GetPeer()), + core.WithIncConfVer(), + ) +} + +func waitPromoteLearner(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { + var res *pdpb.RegionHeartbeatResponse + testutil.Eventually(re, func() bool { + if res = stream.Recv(); res != nil { + return res.GetRegionId() == region.GetID() && + res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_AddNode && + res.GetChangePeer().GetPeer().GetStoreId() == storeID + } + return false + }) + // Remove learner than add voter. + return region.Clone( + core.WithRemoveStorePeer(storeID), + core.WithAddPeer(res.GetChangePeer().GetPeer()), + ) +} + +func waitRemovePeer(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { + var res *pdpb.RegionHeartbeatResponse + testutil.Eventually(re, func() bool { + if res = stream.Recv(); res != nil { + return res.GetRegionId() == region.GetID() && + res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_RemoveNode && + res.GetChangePeer().GetPeer().GetStoreId() == storeID + } + return false + }) + return region.Clone( + core.WithRemoveStorePeer(storeID), + core.WithIncConfVer(), + ) +} + +func waitTransferLeader(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { + var res *pdpb.RegionHeartbeatResponse + testutil.Eventually(re, func() bool { + if res = stream.Recv(); res != nil { + if res.GetRegionId() == region.GetID() { + for _, peer := range append(res.GetTransferLeader().GetPeers(), res.GetTransferLeader().GetPeer()) { + if peer.GetStoreId() == storeID { + return true + } + } + } + } + return false + }) + return region.Clone( + core.WithLeader(region.GetStorePeer(storeID)), + ) +} + +func waitNoResponse(re *require.Assertions, stream mockhbstream.HeartbeatStream) { + testutil.Eventually(re, func() bool { + res := stream.Recv() + return res == nil + }) +} +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) diff --git a/server/handler.go b/server/handler.go index 08419a05c11..e65cd6edab4 100644 --- a/server/handler.go +++ b/server/handler.go @@ -902,15 +902,6 @@ func (h *Handler) GetSchedulerConfigHandler() (http.Handler, error) { return mux, nil } -// GetOfflinePeer gets the region with offline peer. -func (h *Handler) GetOfflinePeer(typ statistics.RegionStatisticType) ([]*core.RegionInfo, error) { - c := h.s.GetRaftCluster() - if c == nil { - return nil, errs.ErrNotBootstrapped.FastGenByArgs() - } - return c.GetOfflineRegionStatsByType(typ), nil -} - // ResetTS resets the ts with specified tso. func (h *Handler) ResetTS(ts uint64) error { tsoAllocator, err := h.s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation) diff --git a/server/statistics/metrics.go b/server/statistics/metrics.go index bd4c897e258..a5ea07f4f55 100644 --- a/server/statistics/metrics.go +++ b/server/statistics/metrics.go @@ -41,14 +41,6 @@ var ( Help: "Status of the regions.", }, []string{"type"}) - offlineRegionStatusGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "pd", - Subsystem: "regions", - Name: "offline_status", - Help: "Status of the offline regions.", - }, []string{"type"}) - clusterStatusGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "pd", @@ -190,7 +182,6 @@ func init() { prometheus.MustRegister(hotCacheStatusGauge) prometheus.MustRegister(storeStatusGauge) prometheus.MustRegister(regionStatusGauge) - prometheus.MustRegister(offlineRegionStatusGauge) prometheus.MustRegister(clusterStatusGauge) prometheus.MustRegister(placementStatusGauge) prometheus.MustRegister(configStatusGauge) diff --git a/server/statistics/region_collection.go b/server/statistics/region_collection.go index 1c46d7acdda..bd07803a29c 100644 --- a/server/statistics/region_collection.go +++ b/server/statistics/region_collection.go @@ -24,6 +24,12 @@ import ( "github.com/tikv/pd/server/schedule/placement" ) +// RegionInfoProvider is an interface to provide the region information. +type RegionInfoProvider interface { + // GetRegion returns the region information according to the given region ID. + GetRegion(regionID uint64) *core.RegionInfo +} + // RegionStatisticType represents the type of the region's status. type RegionStatisticType uint32 @@ -40,11 +46,44 @@ const ( UndersizedRegion ) +var regionStatisticTypes = []RegionStatisticType{ + MissPeer, + ExtraPeer, + DownPeer, + PendingPeer, + OfflinePeer, + LearnerPeer, + EmptyRegion, + OversizedRegion, + UndersizedRegion, + WitnessLeader, +} + const nonIsolation = "none" +<<<<<<< HEAD:server/statistics/region_collection.go // RegionInfo is used to record the status of region. type RegionInfo struct { *core.RegionInfo +======= +var ( + // WithLabelValues is a heavy operation, define variable to avoid call it every time. + regionMissPeerRegionCounter = regionStatusGauge.WithLabelValues("miss-peer-region-count") + regionExtraPeerRegionCounter = regionStatusGauge.WithLabelValues("extra-peer-region-count") + regionDownPeerRegionCounter = regionStatusGauge.WithLabelValues("down-peer-region-count") + regionPendingPeerRegionCounter = regionStatusGauge.WithLabelValues("pending-peer-region-count") + regionOfflinePeerRegionCounter = regionStatusGauge.WithLabelValues("offline-peer-region-count") + regionLearnerPeerRegionCounter = regionStatusGauge.WithLabelValues("learner-peer-region-count") + regionEmptyRegionCounter = regionStatusGauge.WithLabelValues("empty-region-count") + regionOversizedRegionCounter = regionStatusGauge.WithLabelValues("oversized-region-count") + regionUndersizedRegionCounter = regionStatusGauge.WithLabelValues("undersized-region-count") + regionWitnessLeaderRegionCounter = regionStatusGauge.WithLabelValues("witness-leader-region-count") +) + +// RegionInfoWithTS is used to record the extra timestamp status of a region. +type RegionInfoWithTS struct { + id uint64 +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection.go startMissVoterPeerTS int64 startDownPeerTS int64 } @@ -52,26 +91,42 @@ type RegionInfo struct { // RegionStatistics is used to record the status of regions. type RegionStatistics struct { sync.RWMutex +<<<<<<< HEAD:server/statistics/region_collection.go opt *config.PersistOptions stats map[RegionStatisticType]map[uint64]*RegionInfo offlineStats map[RegionStatisticType]map[uint64]*core.RegionInfo +======= + rip RegionInfoProvider + conf sc.CheckerConfig + stats map[RegionStatisticType]map[uint64]*RegionInfoWithTS +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection.go index map[uint64]RegionStatisticType - offlineIndex map[uint64]RegionStatisticType ruleManager *placement.RuleManager storeConfigManager *config.StoreConfigManager } // NewRegionStatistics creates a new RegionStatistics. +<<<<<<< HEAD:server/statistics/region_collection.go func NewRegionStatistics(opt *config.PersistOptions, ruleManager *placement.RuleManager, storeConfigManager *config.StoreConfigManager) *RegionStatistics { r := &RegionStatistics{ opt: opt, +======= +func NewRegionStatistics( + rip RegionInfoProvider, + conf sc.CheckerConfig, + ruleManager *placement.RuleManager, + storeConfigManager *config.StoreConfigManager, +) *RegionStatistics { + r := &RegionStatistics{ + rip: rip, + conf: conf, +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection.go ruleManager: ruleManager, storeConfigManager: storeConfigManager, - stats: make(map[RegionStatisticType]map[uint64]*RegionInfo), - offlineStats: make(map[RegionStatisticType]map[uint64]*core.RegionInfo), + stats: make(map[RegionStatisticType]map[uint64]*RegionInfoWithTS), index: make(map[uint64]RegionStatisticType), - offlineIndex: make(map[uint64]RegionStatisticType), } +<<<<<<< HEAD:server/statistics/region_collection.go r.stats[MissPeer] = make(map[uint64]*RegionInfo) r.stats[ExtraPeer] = make(map[uint64]*RegionInfo) r.stats[DownPeer] = make(map[uint64]*RegionInfo) @@ -87,16 +142,22 @@ func NewRegionStatistics(opt *config.PersistOptions, ruleManager *placement.Rule r.offlineStats[PendingPeer] = make(map[uint64]*core.RegionInfo) r.offlineStats[LearnerPeer] = make(map[uint64]*core.RegionInfo) r.offlineStats[OfflinePeer] = make(map[uint64]*core.RegionInfo) +======= + for _, typ := range regionStatisticTypes { + r.stats[typ] = make(map[uint64]*RegionInfoWithTS) + } +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection.go return r } -// GetRegionStatsByType gets the status of the region by types. The regions here need to be cloned, otherwise, it may cause data race problems. +// GetRegionStatsByType gets the status of the region by types. +// The regions here need to be cloned, otherwise, it may cause data race problems. func (r *RegionStatistics) GetRegionStatsByType(typ RegionStatisticType) []*core.RegionInfo { r.RLock() defer r.RUnlock() res := make([]*core.RegionInfo, 0, len(r.stats[typ])) - for _, r := range r.stats[typ] { - res = append(res, r.RegionInfo.Clone()) + for regionID := range r.stats[typ] { + res = append(res, r.rip.GetRegion(regionID).Clone()) } return res } @@ -109,17 +170,6 @@ func (r *RegionStatistics) IsRegionStatsType(regionID uint64, typ RegionStatisti return exist } -// GetOfflineRegionStatsByType gets the status of the offline region by types. The regions here need to be cloned, otherwise, it may cause data race problems. -func (r *RegionStatistics) GetOfflineRegionStatsByType(typ RegionStatisticType) []*core.RegionInfo { - r.RLock() - defer r.RUnlock() - res := make([]*core.RegionInfo, 0, len(r.stats[typ])) - for _, r := range r.offlineStats[typ] { - res = append(res, r.Clone()) - } - return res -} - func (r *RegionStatistics) deleteEntry(deleteIndex RegionStatisticType, regionID uint64) { for typ := RegionStatisticType(1); typ <= deleteIndex; typ <<= 1 { if deleteIndex&typ != 0 { @@ -128,14 +178,6 @@ func (r *RegionStatistics) deleteEntry(deleteIndex RegionStatisticType, regionID } } -func (r *RegionStatistics) deleteOfflineEntry(deleteIndex RegionStatisticType, regionID uint64) { - for typ := RegionStatisticType(1); typ <= deleteIndex; typ <<= 1 { - if deleteIndex&typ != 0 { - delete(r.offlineStats[typ], regionID) - } - } -} - // RegionStatsNeedUpdate checks whether the region's status need to be updated // due to some special state types. func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool { @@ -152,16 +194,20 @@ func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool { func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.StoreInfo) { r.Lock() defer r.Unlock() - // Region state. - regionID := region.GetID() var ( - peerTypeIndex RegionStatisticType - offlinePeerTypeIndex RegionStatisticType - deleteIndex RegionStatisticType + desiredReplicas = r.conf.GetMaxReplicas() + desiredVoters = desiredReplicas + peerTypeIndex RegionStatisticType + deleteIndex RegionStatisticType ) +<<<<<<< HEAD:server/statistics/region_collection.go desiredReplicas := r.opt.GetMaxReplicas() desiredVoters := desiredReplicas if r.opt.IsPlacementRulesEnabled() { +======= + // Check if the region meets count requirements of its rules. + if r.conf.IsPlacementRulesEnabled() { +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection.go if !r.ruleManager.IsInitialized() { log.Warn("ruleManager haven't been initialized") return @@ -176,19 +222,6 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store } } } - - var isRemoving bool - - for _, store := range stores { - if store.IsRemoving() { - peer := region.GetStorePeer(store.GetID()) - if peer != nil { - isRemoving = true - break - } - } - } - // Better to make sure once any of these conditions changes, it will trigger the heartbeat `save_cache`. // Otherwise, the state may be out-of-date for a long time, which needs another way to apply the change ASAP. // For example, see `RegionStatsNeedUpdate` above to know how `OversizedRegion` and ``UndersizedRegion` are updated. @@ -197,6 +230,17 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store ExtraPeer: len(region.GetPeers()) > desiredReplicas, DownPeer: len(region.GetDownPeers()) > 0, PendingPeer: len(region.GetPendingPeers()) > 0, + OfflinePeer: func() bool { + for _, store := range stores { + if store.IsRemoving() { + peer := region.GetStorePeer(store.GetID()) + if peer != nil { + return true + } + } + } + return false + }(), LearnerPeer: len(region.GetLearners()) > 0, EmptyRegion: region.GetApproximateSize() <= core.EmptyRegionApproximateSize, OversizedRegion: region.IsOversized( @@ -208,18 +252,13 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store int64(r.opt.GetMaxMergeRegionKeys()), ), } - + // Check if the region meets any of the conditions and update the corresponding info. + regionID := region.GetID() for typ, c := range conditions { if c { - if isRemoving && typ < EmptyRegion { - r.offlineStats[typ][regionID] = region - offlinePeerTypeIndex |= typ - } info := r.stats[typ][regionID] if info == nil { - info = &RegionInfo{ - RegionInfo: region, - } + info = &RegionInfoWithTS{id: regionID} } if typ == DownPeer { if info.startDownPeerTS != 0 { @@ -239,18 +278,7 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store peerTypeIndex |= typ } } - - if isRemoving { - r.offlineStats[OfflinePeer][regionID] = region - offlinePeerTypeIndex |= OfflinePeer - } - - if oldIndex, ok := r.offlineIndex[regionID]; ok { - deleteIndex = oldIndex &^ offlinePeerTypeIndex - } - r.deleteOfflineEntry(deleteIndex, regionID) - r.offlineIndex[regionID] = offlinePeerTypeIndex - + // Remove the info if any of the conditions are not met any more. if oldIndex, ok := r.index[regionID]; ok { deleteIndex = oldIndex &^ peerTypeIndex } @@ -265,15 +293,13 @@ func (r *RegionStatistics) ClearDefunctRegion(regionID uint64) { if oldIndex, ok := r.index[regionID]; ok { r.deleteEntry(oldIndex, regionID) } - if oldIndex, ok := r.offlineIndex[regionID]; ok { - r.deleteOfflineEntry(oldIndex, regionID) - } } // Collect collects the metrics of the regions' status. func (r *RegionStatistics) Collect() { r.RLock() defer r.RUnlock() +<<<<<<< HEAD:server/statistics/region_collection.go regionStatusGauge.WithLabelValues("miss-peer-region-count").Set(float64(len(r.stats[MissPeer]))) regionStatusGauge.WithLabelValues("extra-peer-region-count").Set(float64(len(r.stats[ExtraPeer]))) regionStatusGauge.WithLabelValues("down-peer-region-count").Set(float64(len(r.stats[DownPeer]))) @@ -289,12 +315,37 @@ func (r *RegionStatistics) Collect() { offlineRegionStatusGauge.WithLabelValues("pending-peer-region-count").Set(float64(len(r.offlineStats[PendingPeer]))) offlineRegionStatusGauge.WithLabelValues("learner-peer-region-count").Set(float64(len(r.offlineStats[LearnerPeer]))) offlineRegionStatusGauge.WithLabelValues("offline-peer-region-count").Set(float64(len(r.offlineStats[OfflinePeer]))) +======= + regionMissPeerRegionCounter.Set(float64(len(r.stats[MissPeer]))) + regionExtraPeerRegionCounter.Set(float64(len(r.stats[ExtraPeer]))) + regionDownPeerRegionCounter.Set(float64(len(r.stats[DownPeer]))) + regionPendingPeerRegionCounter.Set(float64(len(r.stats[PendingPeer]))) + regionOfflinePeerRegionCounter.Set(float64(len(r.stats[OfflinePeer]))) + regionLearnerPeerRegionCounter.Set(float64(len(r.stats[LearnerPeer]))) + regionEmptyRegionCounter.Set(float64(len(r.stats[EmptyRegion]))) + regionOversizedRegionCounter.Set(float64(len(r.stats[OversizedRegion]))) + regionUndersizedRegionCounter.Set(float64(len(r.stats[UndersizedRegion]))) + regionWitnessLeaderRegionCounter.Set(float64(len(r.stats[WitnessLeader]))) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection.go } // Reset resets the metrics of the regions' status. func (r *RegionStatistics) Reset() { +<<<<<<< HEAD:server/statistics/region_collection.go regionStatusGauge.Reset() offlineRegionStatusGauge.Reset() +======= + regionMissPeerRegionCounter.Set(0) + regionExtraPeerRegionCounter.Set(0) + regionDownPeerRegionCounter.Set(0) + regionPendingPeerRegionCounter.Set(0) + regionOfflinePeerRegionCounter.Set(0) + regionLearnerPeerRegionCounter.Set(0) + regionEmptyRegionCounter.Set(0) + regionOversizedRegionCounter.Set(0) + regionUndersizedRegionCounter.Set(0) + regionWitnessLeaderRegionCounter.Set(0) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection.go } // LabelStatistics is the statistics of the level of labels. diff --git a/server/statistics/region_collection_test.go b/server/statistics/region_collection_test.go index eb100e958fd..43e52a2c940 100644 --- a/server/statistics/region_collection_test.go +++ b/server/statistics/region_collection_test.go @@ -80,6 +80,7 @@ func (t *testRegionStatisticsSuite) TestRegionStatistics(c *C) { r2 := &metapb.Region{Id: 2, Peers: peers[0:2], StartKey: []byte("cc"), EndKey: []byte("dd")} region1 := core.NewRegionInfo(r1, peers[0]) region2 := core.NewRegionInfo(r2, peers[0]) +<<<<<<< HEAD:server/statistics/region_collection_test.go regionStats := NewRegionStatistics(opt, t.manager, nil) regionStats.Observe(region1, stores) c.Assert(regionStats.stats[ExtraPeer], HasLen, 1) @@ -88,6 +89,15 @@ func (t *testRegionStatisticsSuite) TestRegionStatistics(c *C) { c.Assert(regionStats.stats[UndersizedRegion], HasLen, 1) c.Assert(regionStats.offlineStats[ExtraPeer], HasLen, 1) c.Assert(regionStats.offlineStats[LearnerPeer], HasLen, 1) +======= + regionStats := NewRegionStatistics(nil, opt, manager, nil) + regionStats.Observe(region1, stores) + re.Len(regionStats.stats[ExtraPeer], 1) + re.Len(regionStats.stats[LearnerPeer], 1) + re.Len(regionStats.stats[EmptyRegion], 1) + re.Len(regionStats.stats[UndersizedRegion], 1) + re.Len(regionStats.stats[OfflinePeer], 1) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection_test.go region1 = region1.Clone( core.WithDownPeers(downPeers), @@ -95,6 +105,7 @@ func (t *testRegionStatisticsSuite) TestRegionStatistics(c *C) { core.SetApproximateSize(144), ) regionStats.Observe(region1, stores) +<<<<<<< HEAD:server/statistics/region_collection_test.go c.Assert(regionStats.stats[ExtraPeer], HasLen, 1) c.Assert(regionStats.stats[MissPeer], HasLen, 0) c.Assert(regionStats.stats[DownPeer], HasLen, 1) @@ -139,6 +150,37 @@ func (t *testRegionStatisticsSuite) TestRegionStatistics(c *C) { c.Assert(regionStats.offlineStats[PendingPeer], HasLen, 0) c.Assert(regionStats.offlineStats[LearnerPeer], HasLen, 0) c.Assert(regionStats.offlineStats[OfflinePeer], HasLen, 0) +======= + re.Len(regionStats.stats[ExtraPeer], 1) + re.Empty(regionStats.stats[MissPeer]) + re.Len(regionStats.stats[DownPeer], 1) + re.Len(regionStats.stats[PendingPeer], 1) + re.Len(regionStats.stats[LearnerPeer], 1) + re.Empty(regionStats.stats[EmptyRegion]) + re.Len(regionStats.stats[OversizedRegion], 1) + re.Empty(regionStats.stats[UndersizedRegion]) + re.Len(regionStats.stats[OfflinePeer], 1) + + region2 = region2.Clone(core.WithDownPeers(downPeers[0:1])) + regionStats.Observe(region2, stores[0:2]) + re.Len(regionStats.stats[ExtraPeer], 1) + re.Len(regionStats.stats[MissPeer], 1) + re.Len(regionStats.stats[DownPeer], 2) + re.Len(regionStats.stats[PendingPeer], 1) + re.Len(regionStats.stats[LearnerPeer], 1) + re.Len(regionStats.stats[OversizedRegion], 1) + re.Len(regionStats.stats[UndersizedRegion], 1) + re.Len(regionStats.stats[OfflinePeer], 1) + + region1 = region1.Clone(core.WithRemoveStorePeer(7)) + regionStats.Observe(region1, stores[0:3]) + re.Empty(regionStats.stats[ExtraPeer]) + re.Len(regionStats.stats[MissPeer], 1) + re.Len(regionStats.stats[DownPeer], 2) + re.Len(regionStats.stats[PendingPeer], 1) + re.Empty(regionStats.stats[LearnerPeer]) + re.Empty(regionStats.stats[OfflinePeer]) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection_test.go store3 = stores[3].Clone(core.UpStore()) stores[3] = store3 @@ -173,7 +215,12 @@ func (t *testRegionStatisticsSuite) TestRegionStatisticsWithPlacementRule(c *C) region2 := core.NewRegionInfo(r2, peers[0]) region3 := core.NewRegionInfo(r3, peers[0]) region4 := core.NewRegionInfo(r4, peers[0]) +<<<<<<< HEAD:server/statistics/region_collection_test.go regionStats := NewRegionStatistics(opt, t.manager, nil) +======= + region5 := core.NewRegionInfo(r5, peers[4]) + regionStats := NewRegionStatistics(nil, opt, manager, nil) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection_test.go // r2 didn't match the rules regionStats.Observe(region2, stores) c.Assert(regionStats.stats[MissPeer], HasLen, 1)