diff --git a/go.mod b/go.mod index 73f3d783db2..86f56089347 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a + github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 diff --git a/go.sum b/go.sum index 6f73b806860..9392644f181 100644 --- a/go.sum +++ b/go.sum @@ -439,8 +439,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c h1:5cpE29kMHjc8fv+mRiXbLTDfoZHiX5BTK6knVWlWvqk= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index c811303d1c4..809244771b7 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -1139,6 +1139,111 @@ "timeFrom": null, "timeShift": null }, + { + "bars": false, + "cacheTimeout": null, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The current peer count of the cluster", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 6, + "w": 4, + "x": 16, + "y": 13 + }, + "hiddenSeries": false, + "id": 22, + "interval": null, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "maxDataPoints": 100, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.10", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(pd_rule_manager_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}) by (type)", + "format": "time_series", + "interval": "", + "intervalFactor": 2, + "legendFormat": "{{type}}", + "refId": "A", + "step": 4 + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Placement Rules Status", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:192", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:193", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "collapsed": true, "gridPos": { diff --git a/pkg/core/region.go b/pkg/core/region.go index 6631005a608..6875844b7a6 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -1675,6 +1675,23 @@ func (r *RegionsInfo) GetAverageRegionSize() int64 { return r.tree.TotalSize() / int64(r.tree.length()) } +// ValidRegion is used to decide if the region is valid. +func (r *RegionsInfo) ValidRegion(region *metapb.Region) error { + startKey := region.GetStartKey() + currnetRegion := r.GetRegionByKey(startKey) + if currnetRegion == nil { + return errors.Errorf("region not found, request region: %v", logutil.RedactStringer(RegionToHexMeta(region))) + } + // If the request epoch is less than current region epoch, then returns an error. + regionEpoch := region.GetRegionEpoch() + currnetEpoch := currnetRegion.GetMeta().GetRegionEpoch() + if regionEpoch.GetVersion() < currnetEpoch.GetVersion() || + regionEpoch.GetConfVer() < currnetEpoch.GetConfVer() { + return errors.Errorf("invalid region epoch, request: %v, current: %v", regionEpoch, currnetEpoch) + } + return nil +} + // DiffRegionPeersInfo return the difference of peers info between two RegionInfo func DiffRegionPeersInfo(origin *RegionInfo, other *RegionInfo) string { var ret []string diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index d462a9a58b5..4f9caca41e6 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -18,6 +18,7 @@ import ( "fmt" "os" "path/filepath" + "reflect" "strings" "sync/atomic" "time" @@ -232,6 +233,14 @@ func (o *PersistConfig) getSchedulersUpdatingNotifier() chan<- struct{} { return v.(chan<- struct{}) } +func (o *PersistConfig) tryNotifySchedulersUpdating() { + notifier := o.getSchedulersUpdatingNotifier() + if notifier == nil { + return + } + notifier <- struct{}{} +} + // GetClusterVersion returns the cluster version. func (o *PersistConfig) GetClusterVersion() *semver.Version { return (*semver.Version)(atomic.LoadPointer(&o.clusterVersion)) @@ -251,11 +260,10 @@ func (o *PersistConfig) GetScheduleConfig() *sc.ScheduleConfig { func (o *PersistConfig) SetScheduleConfig(cfg *sc.ScheduleConfig) { old := o.GetScheduleConfig() o.schedule.Store(cfg) - // The coordinator is not aware of the underlying scheduler config changes, however, it - // should react on the scheduler number changes to handle the add/remove scheduler events. - if notifier := o.getSchedulersUpdatingNotifier(); notifier != nil && - len(old.Schedulers) != len(cfg.Schedulers) { - notifier <- struct{}{} + // The coordinator is not aware of the underlying scheduler config changes, + // we should notify it to update the schedulers proactively. + if !reflect.DeepEqual(old.Schedulers, cfg.Schedulers) { + o.tryNotifySchedulersUpdating() } } @@ -650,6 +658,11 @@ func (o *PersistConfig) IsRaftKV2() bool { return o.GetStoreConfig().IsRaftKV2() } +// IsTikvRegionSplitEnabled returns whether tikv split region is disabled. +func (o *PersistConfig) IsTikvRegionSplitEnabled() bool { + return o.GetScheduleConfig().EnableTiKVSplitRegion +} + // TODO: implement the following methods // AddSchedulerCfg adds the scheduler configurations. diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index e2e2c8af32d..8e47e7380f9 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -22,13 +22,16 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/schedulingpb" "github.com/pingcap/log" bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/registry" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/versioninfo" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -254,6 +257,74 @@ func (s *Service) GetOperator(ctx context.Context, request *schedulingpb.GetOper }, nil } +// AskBatchSplit implements gRPC PDServer. +func (s *Service) AskBatchSplit(ctx context.Context, request *schedulingpb.AskBatchSplitRequest) (*schedulingpb.AskBatchSplitResponse, error) { + c := s.GetCluster() + if c == nil { + return &schedulingpb.AskBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil + } + + if request.GetRegion() == nil { + return &schedulingpb.AskBatchSplitResponse{ + Header: s.wrapErrorToHeader(schedulingpb.ErrorType_UNKNOWN, + "missing region for split"), + }, nil + } + + if c.persistConfig.IsSchedulingHalted() { + return nil, errs.ErrSchedulingIsHalted.FastGenByArgs() + } + if !c.persistConfig.IsTikvRegionSplitEnabled() { + return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs() + } + reqRegion := request.GetRegion() + splitCount := request.GetSplitCount() + err := c.ValidRegion(reqRegion) + if err != nil { + return nil, err + } + splitIDs := make([]*pdpb.SplitID, 0, splitCount) + recordRegions := make([]uint64, 0, splitCount+1) + + for i := 0; i < int(splitCount); i++ { + newRegionID, err := c.AllocID() + if err != nil { + return nil, errs.ErrSchedulerNotFound.FastGenByArgs() + } + + peerIDs := make([]uint64, len(request.Region.Peers)) + for i := 0; i < len(peerIDs); i++ { + if peerIDs[i], err = c.AllocID(); err != nil { + return nil, err + } + } + + recordRegions = append(recordRegions, newRegionID) + splitIDs = append(splitIDs, &pdpb.SplitID{ + NewRegionId: newRegionID, + NewPeerIds: peerIDs, + }) + + log.Info("alloc ids for region split", zap.Uint64("region-id", newRegionID), zap.Uint64s("peer-ids", peerIDs)) + } + + recordRegions = append(recordRegions, reqRegion.GetId()) + if versioninfo.IsFeatureSupported(c.persistConfig.GetClusterVersion(), versioninfo.RegionMerge) { + // Disable merge the regions in a period of time. + c.GetCoordinator().GetMergeChecker().RecordRegionSplit(recordRegions) + } + + // If region splits during the scheduling process, regions with abnormal + // status may be left, and these regions need to be checked with higher + // priority. + c.GetCoordinator().GetCheckerController().AddSuspectRegions(recordRegions...) + + return &schedulingpb.AskBatchSplitResponse{ + Header: s.header(), + Ids: splitIDs, + }, nil +} + // RegisterGRPCService registers the service to gRPC server. func (s *Service) RegisterGRPCService(g *grpc.Server) { schedulingpb.RegisterSchedulingServer(g, s) diff --git a/pkg/replication/replication_mode.go b/pkg/replication/replication_mode.go index faa06822127..30b34e4596a 100644 --- a/pkg/replication/replication_mode.go +++ b/pkg/replication/replication_mode.go @@ -371,14 +371,6 @@ func (m *ModeManager) Run(ctx context.Context) { wg.Wait() } -func storeIDs(stores []*core.StoreInfo) []uint64 { - ids := make([]uint64, len(stores)) - for i, s := range stores { - ids[i] = s.GetID() - } - return ids -} - func minimalUpVoters(rule *placement.Rule, upStores, downStores []*core.StoreInfo) int { if rule.Role == placement.Learner { return 0 @@ -411,7 +403,7 @@ func (m *ModeManager) tickUpdateState() { drTickCounter.Inc() - stores := m.checkStoreStatus() + stores, storeIDs := m.checkStoreStatus() var primaryHasVoter, drHasVoter bool var totalVoter, totalUpVoter int @@ -440,10 +432,10 @@ func (m *ModeManager) tickUpdateState() { hasMajority := totalUpVoter*2 > totalVoter log.Debug("replication store status", - zap.Uint64s("up-primary", storeIDs(stores[primaryUp])), - zap.Uint64s("up-dr", storeIDs(stores[drUp])), - zap.Uint64s("down-primary", storeIDs(stores[primaryDown])), - zap.Uint64s("down-dr", storeIDs(stores[drDown])), + zap.Uint64s("up-primary", storeIDs[primaryUp]), + zap.Uint64s("up-dr", storeIDs[drUp]), + zap.Uint64s("down-primary", storeIDs[primaryDown]), + zap.Uint64s("down-dr", storeIDs[drDown]), zap.Bool("can-sync", canSync), zap.Bool("has-majority", hasMajority), ) @@ -470,31 +462,31 @@ func (m *ModeManager) tickUpdateState() { case drStateSync: // If hasMajority is false, the cluster is always unavailable. Switch to async won't help. if !canSync && hasMajority { - m.drSwitchToAsyncWait(storeIDs(stores[primaryUp])) + m.drSwitchToAsyncWait(storeIDs[primaryUp]) } case drStateAsyncWait: if canSync { m.drSwitchToSync() break } - if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, storeIDs(stores[primaryUp])) { - m.drSwitchToAsyncWait(storeIDs(stores[primaryUp])) + if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, storeIDs[primaryUp]) { + m.drSwitchToAsyncWait(storeIDs[primaryUp]) break } - if m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) { - m.drSwitchToAsync(storeIDs(stores[primaryUp])) + if m.drCheckStoreStateUpdated(storeIDs[primaryUp]) { + m.drSwitchToAsync(storeIDs[primaryUp]) } case drStateAsync: if canSync { m.drSwitchToSyncRecover() break } - if !reflect.DeepEqual(m.drGetAvailableStores(), stores[primaryUp]) && m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) { - m.drSwitchToAsync(storeIDs(stores[primaryUp])) + if !reflect.DeepEqual(m.drGetAvailableStores(), storeIDs[primaryUp]) && m.drCheckStoreStateUpdated(storeIDs[primaryUp]) { + m.drSwitchToAsync(storeIDs[primaryUp]) } case drStateSyncRecover: if !canSync && hasMajority { - m.drSwitchToAsync(storeIDs(stores[primaryUp])) + m.drSwitchToAsync(storeIDs[primaryUp]) } else { m.updateProgress() progress := m.estimateProgress() @@ -569,39 +561,40 @@ const ( storeStatusTypeCount ) -func (m *ModeManager) checkStoreStatus() [][]*core.StoreInfo { +func (m *ModeManager) checkStoreStatus() ([][]*core.StoreInfo, [][]uint64) { m.RLock() defer m.RUnlock() - stores := make([][]*core.StoreInfo, storeStatusTypeCount) + stores, storeIDs := make([][]*core.StoreInfo, storeStatusTypeCount), make([][]uint64, storeStatusTypeCount) for _, s := range m.cluster.GetStores() { if s.IsRemoved() { continue } - // learner peers do not participate in major commit or vote, so it should not count in primary/dr as a normal store. - if s.GetRegionCount() == s.GetLearnerCount() { - continue - } down := s.DownTime() >= m.config.DRAutoSync.WaitStoreTimeout.Duration labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey) if labelValue == m.config.DRAutoSync.Primary { if down { stores[primaryDown] = append(stores[primaryDown], s) + storeIDs[primaryDown] = append(storeIDs[primaryDown], s.GetID()) } else { stores[primaryUp] = append(stores[primaryUp], s) + storeIDs[primaryUp] = append(storeIDs[primaryUp], s.GetID()) } } if labelValue == m.config.DRAutoSync.DR { if down { stores[drDown] = append(stores[drDown], s) + storeIDs[drDown] = append(storeIDs[drDown], s.GetID()) } else { stores[drUp] = append(stores[drUp], s) + storeIDs[drUp] = append(storeIDs[drUp], s.GetID()) } } } for i := range stores { sort.Slice(stores[i], func(a, b int) bool { return stores[i][a].GetID() < stores[i][b].GetID() }) + sort.Slice(storeIDs[i], func(a, b int) bool { return storeIDs[i][a] < storeIDs[i][b] }) } - return stores + return stores, storeIDs } // UpdateStoreDRStatus saves the dr-autosync status of a store. diff --git a/pkg/replication/replication_mode_test.go b/pkg/replication/replication_mode_test.go index bb9b6559cf6..e01fb7a0b9a 100644 --- a/pkg/replication/replication_mode_test.go +++ b/pkg/replication/replication_mode_test.go @@ -245,8 +245,8 @@ func TestStateSwitch(t *testing.T) { rep.tickUpdateState() re.Equal(drStateSync, rep.drGetState()) - // once the voter node down, even learner node up, swith to async state. - setStoreState(cluster, "up", "up", "up", "up", "down", "up") + // once zone2 down, swith to async state. + setStoreState(cluster, "up", "up", "up", "up", "down", "down") rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) @@ -264,18 +264,18 @@ func TestStateSwitch(t *testing.T) { re.False(rep.GetReplicationStatus().GetDrAutoSync().GetPauseRegionSplit()) // async_wait -> async_wait - setStoreState(cluster, "up", "up", "up", "up", "down", "up") + setStoreState(cluster, "up", "up", "up", "up", "down", "down") rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) assertStateIDUpdate() rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) - setStoreState(cluster, "down", "up", "up", "up", "down", "up") + setStoreState(cluster, "down", "up", "up", "up", "down", "down") rep.tickUpdateState() assertStateIDUpdate() rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[2,3,4]}`, stateID), replicator.lastData[1]) - setStoreState(cluster, "up", "down", "up", "up", "down", "up") + setStoreState(cluster, "up", "down", "up", "up", "down", "down") rep.tickUpdateState() assertStateIDUpdate() rep.tickReplicateStatus() @@ -294,7 +294,7 @@ func TestStateSwitch(t *testing.T) { re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1]) // async -> async - setStoreState(cluster, "up", "up", "up", "up", "down", "up") + setStoreState(cluster, "up", "up", "up", "up", "down", "down") rep.tickUpdateState() // store 2 won't be available before it syncs status. rep.tickReplicateStatus() @@ -319,14 +319,14 @@ func TestStateSwitch(t *testing.T) { // sync_recover -> async rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) - setStoreState(cluster, "up", "up", "up", "up", "down", "up") + setStoreState(cluster, "up", "up", "up", "up", "down", "down") rep.tickUpdateState() re.Equal(drStateAsync, rep.drGetState()) assertStateIDUpdate() // lost majority, does not switch to async. rep.drSwitchToSyncRecover() assertStateIDUpdate() - setStoreState(cluster, "down", "down", "up", "up", "down", "up") + setStoreState(cluster, "down", "down", "up", "up", "down", "down") rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) @@ -636,6 +636,8 @@ func TestComplexPlacementRules(t *testing.T) { setStoreState(cluster, "up", "up", "up", "up", "up", "up", "up", "down", "up", "down") rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) + rep.tickReplicateStatus() + re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4,5,6]}`, rep.drAutoSync.StateID), replicator.lastData[1]) // reset to sync setStoreState(cluster, "up", "up", "up", "up", "up", "up", "up", "up", "up", "up") @@ -695,6 +697,47 @@ func TestComplexPlacementRules2(t *testing.T) { setStoreState(cluster, "up", "up", "up", "up", "down", "down", "up") rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) + rep.tickReplicateStatus() + re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1]) +} + +func TestComplexPlacementRules3(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + store := storage.NewStorageWithMemoryBackend() + conf := config.ReplicationModeConfig{ReplicationMode: modeDRAutoSync, DRAutoSync: config.DRAutoSyncReplicationConfig{ + LabelKey: "zone", + Primary: "zone1", + DR: "zone2", + WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, + }} + cluster := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions()) + replicator := newMockReplicator([]uint64{1}) + rep, err := NewReplicationModeManager(conf, store, cluster, replicator) + re.NoError(err) + cluster.GetRuleManager().SetAllGroupBundles( + genPlacementRuleConfig([]ruleConfig{ + {key: "logic", value: "logic1", role: placement.Voter, count: 2}, + {key: "logic", value: "logic2", role: placement.Learner, count: 1}, + {key: "logic", value: "logic3", role: placement.Voter, count: 1}, + }), true) + + cluster.AddLabelsStore(1, 1, map[string]string{"zone": "zone1", "logic": "logic1"}) + cluster.AddLabelsStore(2, 1, map[string]string{"zone": "zone1", "logic": "logic1"}) + cluster.AddLabelsStore(3, 1, map[string]string{"zone": "zone1", "logic": "logic2"}) + cluster.AddLabelsStore(4, 1, map[string]string{"zone": "zone1", "logic": "logic2"}) + cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2", "logic": "logic3"}) + + // initial state is sync + re.Equal(drStateSync, rep.drGetState()) + + // zone2 down, switch state, available stores should contain logic2 (learner) + setStoreState(cluster, "up", "up", "up", "up", "down") + rep.tickUpdateState() + re.Equal(drStateAsyncWait, rep.drGetState()) + rep.tickReplicateStatus() + re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1]) } func genRegions(cluster *mockcluster.Cluster, stateID uint64, n int) []*core.RegionInfo { diff --git a/pkg/schedule/checker/rule_checker.go b/pkg/schedule/checker/rule_checker.go index 34492a99ec0..b0537bf9ce4 100644 --- a/pkg/schedule/checker/rule_checker.go +++ b/pkg/schedule/checker/rule_checker.go @@ -60,6 +60,7 @@ var ( ruleCheckerReplaceOfflineCounter = checkerCounter.WithLabelValues(ruleChecker, "replace-offline") ruleCheckerAddRulePeerCounter = checkerCounter.WithLabelValues(ruleChecker, "add-rule-peer") ruleCheckerNoStoreAddCounter = checkerCounter.WithLabelValues(ruleChecker, "no-store-add") + ruleCheckerNoStoreThenTryReplace = checkerCounter.WithLabelValues(ruleChecker, "no-store-then-try-replace") ruleCheckerNoStoreReplaceCounter = checkerCounter.WithLabelValues(ruleChecker, "no-store-replace") ruleCheckerFixPeerRoleCounter = checkerCounter.WithLabelValues(ruleChecker, "fix-peer-role") ruleCheckerFixLeaderRoleCounter = checkerCounter.WithLabelValues(ruleChecker, "fix-leader-role") @@ -185,7 +186,7 @@ func (c *RuleChecker) isWitnessEnabled() bool { func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.RegionFit, rf *placement.RuleFit) (*operator.Operator, error) { // make up peers. if len(rf.Peers) < rf.Rule.Count { - return c.addRulePeer(region, rf) + return c.addRulePeer(region, fit, rf) } // fix down/offline peers. for _, peer := range rf.Peers { @@ -220,7 +221,7 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region return c.fixBetterLocation(region, rf) } -func (c *RuleChecker) addRulePeer(region *core.RegionInfo, rf *placement.RuleFit) (*operator.Operator, error) { +func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.RegionFit, rf *placement.RuleFit) (*operator.Operator, error) { ruleCheckerAddRulePeerCounter.Inc() ruleStores := c.getRuleFitStores(rf) isWitness := rf.Rule.IsWitness && c.isWitnessEnabled() @@ -229,6 +230,25 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, rf *placement.RuleFit if store == 0 { ruleCheckerNoStoreAddCounter.Inc() c.handleFilterState(region, filterByTempState) + // try to replace an existing peer that matches the label constraints. + // issue: https://github.com/tikv/pd/issues/7185 + for _, p := range region.GetPeers() { + s := c.cluster.GetStore(p.GetStoreId()) + if placement.MatchLabelConstraints(s, rf.Rule.LabelConstraints) { + oldPeerRuleFit := fit.GetRuleFit(p.GetId()) + if oldPeerRuleFit == nil || !oldPeerRuleFit.IsSatisfied() || oldPeerRuleFit == rf { + continue + } + ruleCheckerNoStoreThenTryReplace.Inc() + op, err := c.replaceUnexpectRulePeer(region, oldPeerRuleFit, fit, p, "swap-fit") + if err != nil { + return nil, err + } + if op != nil { + return op, nil + } + } + } return nil, errNoStoreToAdd } peer := &metapb.Peer{StoreId: store, Role: rf.Rule.Role.MetaPeerRole(), IsWitness: isWitness} diff --git a/pkg/schedule/checker/rule_checker_test.go b/pkg/schedule/checker/rule_checker_test.go index ad140e91606..8ee3b1eccfa 100644 --- a/pkg/schedule/checker/rule_checker_test.go +++ b/pkg/schedule/checker/rule_checker_test.go @@ -17,6 +17,8 @@ package checker import ( "context" "fmt" + "strconv" + "strings" "testing" "time" @@ -38,6 +40,7 @@ import ( func TestRuleCheckerTestSuite(t *testing.T) { suite.Run(t, new(ruleCheckerTestSuite)) + suite.Run(t, new(ruleCheckerTestAdvancedSuite)) } type ruleCheckerTestSuite struct { @@ -1583,3 +1586,142 @@ func (suite *ruleCheckerTestSuite) TestTiFlashLocationLabels() { op := suite.rc.Check(suite.cluster.GetRegion(1)) suite.Nil(op) } + +type ruleCheckerTestAdvancedSuite struct { + suite.Suite + cluster *mockcluster.Cluster + ruleManager *placement.RuleManager + rc *RuleChecker + ctx context.Context + cancel context.CancelFunc +} + +func (suite *ruleCheckerTestAdvancedSuite) SetupTest() { + cfg := mockconfig.NewTestOptions() + suite.ctx, suite.cancel = context.WithCancel(context.Background()) + suite.cluster = mockcluster.NewCluster(suite.ctx, cfg) + suite.cluster.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.SwitchWitness)) + suite.cluster.SetEnablePlacementRules(true) + suite.cluster.SetEnableWitness(true) + suite.cluster.SetEnableUseJointConsensus(true) + suite.ruleManager = suite.cluster.RuleManager + suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewDefaultCache(10)) +} + +func (suite *ruleCheckerTestAdvancedSuite) TearDownTest() { + suite.cancel() +} + +func makeStores() placement.StoreSet { + stores := core.NewStoresInfo() + now := time.Now() + for region := 1; region <= 3; region++ { + for zone := 1; zone <= 5; zone++ { + for host := 1; host <= 5; host++ { + id := uint64(region*100 + zone*10 + host) + labels := map[string]string{ + "region": fmt.Sprintf("region%d", region), + "zone": fmt.Sprintf("zone%d", zone), + "host": fmt.Sprintf("host%d", host), + } + if host == 5 { + labels["engine"] = "tiflash" + } + if zone == 1 && host == 1 { + labels["type"] = "read" + } + stores.SetStore(core.NewStoreInfoWithLabel(id, labels).Clone(core.SetLastHeartbeatTS(now), core.SetStoreState(metapb.StoreState_Up))) + } + } + } + return stores +} + +// example: "1111_leader,1234,2111_learner" +func makeRegion(def string) *core.RegionInfo { + var regionMeta metapb.Region + var leader *metapb.Peer + for _, peerDef := range strings.Split(def, ",") { + role, idStr := placement.Follower, peerDef + if strings.Contains(peerDef, "_") { + splits := strings.Split(peerDef, "_") + idStr, role = splits[0], placement.PeerRoleType(splits[1]) + } + id, _ := strconv.Atoi(idStr) + peer := &metapb.Peer{Id: uint64(id), StoreId: uint64(id), Role: role.MetaPeerRole()} + regionMeta.Peers = append(regionMeta.Peers, peer) + if role == placement.Leader { + leader = peer + regionMeta.Id = peer.Id - 1 + } + } + return core.NewRegionInfo(®ionMeta, leader) +} + +// example: "3/voter/zone=zone1+zone2,rack=rack2/zone,rack,host" +// count role constraints location_labels +func makeRule(def string) *placement.Rule { + var rule placement.Rule + splits := strings.Split(def, "/") + rule.Count, _ = strconv.Atoi(splits[0]) + rule.Role = placement.PeerRoleType(splits[1]) + // only support k=v type constraint + for _, c := range strings.Split(splits[2], ",") { + if c == "" { + break + } + kv := strings.Split(c, "=") + rule.LabelConstraints = append(rule.LabelConstraints, placement.LabelConstraint{ + Key: kv[0], + Op: "in", + Values: strings.Split(kv[1], "+"), + }) + } + rule.LocationLabels = strings.Split(splits[3], ",") + return &rule +} + +// TestReplaceAnExistingPeerCases address issue: https://github.com/tikv/pd/issues/7185 +func (suite *ruleCheckerTestAdvancedSuite) TestReplaceAnExistingPeerCases() { + stores := makeStores() + for _, store := range stores.GetStores() { + suite.cluster.PutStore(store) + } + + testCases := []struct { + region string + rules []string + opStr string + }{ + {"111_leader,211,311", []string{"3/voter//", "3/learner/type=read/"}, "replace-rule-swap-fit-peer {mv peer: store [111] to"}, + {"211,311_leader,151", []string{"3/voter//", "3/learner/type=read/"}, "add-rule-peer {add peer: store [111]}"}, + {"111_learner,211,311_leader,151", []string{"3/voter//", "3/learner/type=read/"}, "replace-rule-swap-fit-peer {mv peer: store [211] to"}, + {"111_learner,311_leader,151,351", []string{"3/voter//", "3/learner/type=read/"}, "add-rule-peer {add peer: store [211]}"}, + {"111_learner,211_learner,311_leader,151,351", []string{"3/voter//", "3/learner/type=read/"}, "replace-rule-swap-fit-peer {mv peer: store [311] to"}, + {"111_learner,211_learner,151_leader,252,351", []string{"3/voter//", "3/learner/type=read/"}, "add-rule-peer {add peer: store [311]}"}, + {"111_learner,211_learner,311_learner,151_leader,252,351", []string{"3/voter//", "3/learner/type=read/"}, ""}, + } + groupName := "a_test" + for i, cas := range testCases { + bundle := placement.GroupBundle{ + ID: groupName, + Index: 1000, + Override: true, + Rules: make([]*placement.Rule, 0, len(cas.rules)), + } + for id, r := range cas.rules { + rule := makeRule(r) + rule.ID = fmt.Sprintf("r%d", id) + bundle.Rules = append(bundle.Rules, rule) + } + err := suite.ruleManager.SetGroupBundle(bundle) + suite.NoError(err) + region := makeRegion(cas.region) + suite.cluster.PutRegion(region) + op := suite.rc.Check(region) + if len(cas.opStr) > 0 { + suite.Contains(op.String(), cas.opStr, i, cas.opStr) + } + suite.ruleManager.DeleteGroupBundle(groupName, false) + } +} diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index f35bd6d4de3..8fb9ec8b286 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -458,13 +458,16 @@ func (c *Coordinator) InitSchedulers(needRun bool) { log.Error("can not create scheduler with independent configuration", zap.String("scheduler-name", name), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) continue } - log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName())) if needRun { + log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName())) if err = c.schedulers.AddScheduler(s); err != nil { log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) } - } else if err = c.schedulers.AddSchedulerHandler(s); err != nil { - log.Error("can not add scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) + } else { + log.Info("create scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName())) + if err = c.schedulers.AddSchedulerHandler(s); err != nil { + log.Error("can not add scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) + } } } @@ -484,8 +487,8 @@ func (c *Coordinator) InitSchedulers(needRun bool) { continue } - log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args)) if needRun { + log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args)) if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) } else { @@ -493,8 +496,14 @@ func (c *Coordinator) InitSchedulers(needRun bool) { scheduleCfg.Schedulers[k] = schedulerCfg k++ } - } else if err = c.schedulers.AddSchedulerHandler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { - log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) + } else { + log.Info("create scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args)) + if err = c.schedulers.AddSchedulerHandler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { + log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) + } else { + scheduleCfg.Schedulers[k] = schedulerCfg + k++ + } } } diff --git a/pkg/schedule/placement/fit_test.go b/pkg/schedule/placement/fit_test.go index d7b3c91163e..286dbcdacd5 100644 --- a/pkg/schedule/placement/fit_test.go +++ b/pkg/schedule/placement/fit_test.go @@ -44,6 +44,9 @@ func makeStores() StoreSet { if x == 5 { labels["engine"] = "tiflash" } + if id == 1111 || id == 2111 || id == 3111 { + labels["disk"] = "ssd" + } stores.SetStore(core.NewStoreInfoWithLabel(id, labels).Clone(core.SetLastHeartbeatTS(now))) } } @@ -186,6 +189,9 @@ func TestFitRegion(t *testing.T) { {"1111,1112,1113,1114", []string{"3/voter//", "1/voter/id=id1/"}, "1112,1113,1114/1111"}, {"1111,2211,3111,3112", []string{"3/voter//zone", "1/voter/rack=rack2/"}, "1111,2211,3111//3112"}, {"1111,2211,3111,3112", []string{"1/voter/rack=rack2/", "3/voter//zone"}, "2211/1111,3111,3112"}, + {"1111_leader,2111,3111", []string{"3/voter//", "3/learner/disk=ssd/"}, "1111,2111,3111/"}, + {"1111_leader,2111,3111,4111", []string{"3/voter//", "3/learner/disk=ssd/"}, "1111,2111,4111/3111"}, + {"1111_leader,2111,3111,4111_learner", []string{"3/voter//", "3/learner/disk=ssd/"}, "1111,2111,3111//4111"}, } for _, testCase := range testCases { diff --git a/pkg/schedule/placement/rule_manager.go b/pkg/schedule/placement/rule_manager.go index bdca4cc1b19..a7e169b74aa 100644 --- a/pkg/schedule/placement/rule_manager.go +++ b/pkg/schedule/placement/rule_manager.go @@ -321,6 +321,20 @@ func (m *RuleManager) GetAllRules() []*Rule { return rules } +// GetRulesCount returns the number of rules. +func (m *RuleManager) GetRulesCount() int { + m.RLock() + defer m.RUnlock() + return len(m.ruleConfig.rules) +} + +// GetGroupsCount returns the number of rule groups. +func (m *RuleManager) GetGroupsCount() int { + m.RLock() + defer m.RUnlock() + return len(m.ruleConfig.groups) +} + // GetRulesByGroup returns sorted rules of a group. func (m *RuleManager) GetRulesByGroup(group string) []*Rule { m.RLock() diff --git a/pkg/schedule/placement/rule_manager_test.go b/pkg/schedule/placement/rule_manager_test.go index a6454337aa8..dad50a2d881 100644 --- a/pkg/schedule/placement/rule_manager_test.go +++ b/pkg/schedule/placement/rule_manager_test.go @@ -163,6 +163,8 @@ func TestSaveLoad(t *testing.T) { re.Equal(rules[0].String(), m2.GetRule("pd", "default").String()) re.Equal(rules[1].String(), m2.GetRule("foo", "baz").String()) re.Equal(rules[2].String(), m2.GetRule("foo", "bar").String()) + re.Equal(manager.GetRulesCount(), 3) + re.Equal(manager.GetGroupsCount(), 2) } func TestSetAfterGet(t *testing.T) { diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index a6665c3e5e7..cc1b16300c5 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -15,6 +15,11 @@ package schedulers import ( + "net/http" + "sync/atomic" + "time" + + "github.com/gorilla/mux" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" @@ -23,6 +28,8 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/unrolled/render" "go.uber.org/zap" ) @@ -40,8 +47,27 @@ const ( var evictSlowStoreCounter = schedulerCounter.WithLabelValues(EvictSlowStoreName, "schedule") type evictSlowStoreSchedulerConfig struct { - storage endpoint.ConfigStorage - EvictedStores []uint64 `json:"evict-stores"` + storage endpoint.ConfigStorage + // Last timestamp of the chosen slow store for eviction. + lastSlowStoreCaptureTS time.Time + // Duration gap for recovering the candidate, unit: s. + RecoveryDurationGap uint64 `json:"recovery-duration"` + EvictedStores []uint64 `json:"evict-stores"` +} + +func initEvictSlowStoreSchedulerConfig(storage endpoint.ConfigStorage) *evictSlowStoreSchedulerConfig { + return &evictSlowStoreSchedulerConfig{ + storage: storage, + lastSlowStoreCaptureTS: time.Time{}, + RecoveryDurationGap: defaultRecoveryDurationGap, + EvictedStores: make([]uint64, 0), + } +} + +func (conf *evictSlowStoreSchedulerConfig) Clone() *evictSlowStoreSchedulerConfig { + return &evictSlowStoreSchedulerConfig{ + RecoveryDurationGap: atomic.LoadUint64(&conf.RecoveryDurationGap), + } } func (conf *evictSlowStoreSchedulerConfig) Persist() error { @@ -78,8 +104,18 @@ func (conf *evictSlowStoreSchedulerConfig) evictStore() uint64 { return conf.EvictedStores[0] } +// readyForRecovery checks whether the last cpatured candidate is ready for recovery. +func (conf *evictSlowStoreSchedulerConfig) readyForRecovery() bool { + recoveryDurationGap := atomic.LoadUint64(&conf.RecoveryDurationGap) + failpoint.Inject("transientRecoveryGap", func() { + recoveryDurationGap = 0 + }) + return uint64(time.Since(conf.lastSlowStoreCaptureTS).Seconds()) >= recoveryDurationGap +} + func (conf *evictSlowStoreSchedulerConfig) setStoreAndPersist(id uint64) error { conf.EvictedStores = []uint64{id} + conf.lastSlowStoreCaptureTS = time.Now() return conf.Persist() } @@ -87,14 +123,58 @@ func (conf *evictSlowStoreSchedulerConfig) clearAndPersist() (oldID uint64, err oldID = conf.evictStore() if oldID > 0 { conf.EvictedStores = []uint64{} + conf.lastSlowStoreCaptureTS = time.Time{} err = conf.Persist() } return } +type evictSlowStoreHandler struct { + rd *render.Render + config *evictSlowStoreSchedulerConfig +} + +func newEvictSlowStoreHandler(config *evictSlowStoreSchedulerConfig) http.Handler { + h := &evictSlowStoreHandler{ + config: config, + rd: render.New(render.Options{IndentJSON: true}), + } + router := mux.NewRouter() + router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) + return router +} + +func (handler *evictSlowStoreHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { + var input map[string]interface{} + if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { + return + } + recoveryDurationGapFloat, ok := input["recovery-duration"].(float64) + if !ok { + handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error()) + return + } + recoveryDurationGap := (uint64)(recoveryDurationGapFloat) + prevRecoveryDurationGap := atomic.LoadUint64(&handler.config.RecoveryDurationGap) + atomic.StoreUint64(&handler.config.RecoveryDurationGap, recoveryDurationGap) + log.Info("evict-slow-store-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap)) + handler.rd.JSON(w, http.StatusOK, nil) +} + +func (handler *evictSlowStoreHandler) ListConfig(w http.ResponseWriter, r *http.Request) { + conf := handler.config.Clone() + handler.rd.JSON(w, http.StatusOK, conf) +} + type evictSlowStoreScheduler struct { *BaseScheduler - conf *evictSlowStoreSchedulerConfig + conf *evictSlowStoreSchedulerConfig + handler http.Handler +} + +func (s *evictSlowStoreScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) } func (s *evictSlowStoreScheduler) GetName() string { @@ -168,7 +248,7 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, dryRun // slow node next time. log.Info("slow store has been removed", zap.Uint64("store-id", store.GetID())) - } else if store.GetSlowScore() <= slowStoreRecoverThreshold { + } else if store.GetSlowScore() <= slowStoreRecoverThreshold && s.conf.readyForRecovery() { log.Info("slow store has been recovered", zap.Uint64("store-id", store.GetID())) } else { @@ -211,11 +291,10 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, dryRun // newEvictSlowStoreScheduler creates a scheduler that detects and evicts slow stores. func newEvictSlowStoreScheduler(opController *operator.Controller, conf *evictSlowStoreSchedulerConfig) Scheduler { - base := NewBaseScheduler(opController) - - s := &evictSlowStoreScheduler{ - BaseScheduler: base, + handler := newEvictSlowStoreHandler(conf) + return &evictSlowStoreScheduler{ + BaseScheduler: NewBaseScheduler(opController), conf: conf, + handler: handler, } - return s } diff --git a/pkg/schedule/schedulers/evict_slow_store_test.go b/pkg/schedule/schedulers/evict_slow_store_test.go index 0b0c1d9ad39..813d17ae541 100644 --- a/pkg/schedule/schedulers/evict_slow_store_test.go +++ b/pkg/schedule/schedulers/evict_slow_store_test.go @@ -67,6 +67,7 @@ func (suite *evictSlowStoreTestSuite) TearDownTest() { } func (suite *evictSlowStoreTestSuite) TestEvictSlowStore() { + suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/schedulers/transientRecoveryGap", "return(true)")) storeInfo := suite.tc.GetStore(1) newStoreInfo := storeInfo.Clone(func(store *core.StoreInfo) { store.GetStoreStats().SlowScore = 100 @@ -113,6 +114,8 @@ func (suite *evictSlowStoreTestSuite) TestEvictSlowStore() { suite.NoError(err) suite.Equal(es2.conf.EvictedStores, persistValue.EvictedStores) suite.Zero(persistValue.evictStore()) + suite.True(persistValue.readyForRecovery()) + suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/schedulers/transientRecoveryGap")) } func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePrepare() { @@ -124,6 +127,7 @@ func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePrepare() { es2.conf.setStoreAndPersist(1) suite.Equal(uint64(1), es2.conf.evictStore()) + suite.False(es2.conf.readyForRecovery()) // prepare with evict store. suite.es.Prepare(suite.tc) } diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index dde10610643..d45602b90e1 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -159,7 +159,7 @@ func schedulersRegister() { }) RegisterScheduler(EvictSlowStoreType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { - conf := &evictSlowStoreSchedulerConfig{storage: storage, EvictedStores: make([]uint64, 0)} + conf := initEvictSlowStoreSchedulerConfig(storage) if err := decoder(conf); err != nil { return nil, err } diff --git a/pkg/schedule/schedulers/metrics.go b/pkg/schedule/schedulers/metrics.go index 2052bc923af..34e4606a7ce 100644 --- a/pkg/schedule/schedulers/metrics.go +++ b/pkg/schedule/schedulers/metrics.go @@ -134,10 +134,19 @@ var ( Name: "hot_pending_sum", Help: "Pending influence sum of store in hot region scheduler.", }, []string{"store", "rw", "dim"}) + + ruleStatusGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "rule_manager", + Name: "status", + Help: "Status of the rule.", + }, []string{"type"}) ) func init() { prometheus.MustRegister(schedulerStatusGauge) + prometheus.MustRegister(ruleStatusGauge) prometheus.MustRegister(schedulerCounter) prometheus.MustRegister(balanceWitnessCounter) prometheus.MustRegister(hotSchedulerResultCounter) diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index d58a78ca82f..0f2264392aa 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -36,7 +36,11 @@ import ( const maxScheduleRetries = 10 -var denySchedulersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("schedulers", "deny") +var ( + denySchedulersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("schedulers", "deny") + rulesCntStatusGauge = ruleStatusGauge.WithLabelValues("rule_count") + groupsCntStatusGauge = ruleStatusGauge.WithLabelValues("group_count") +) // Controller is used to manage all schedulers. type Controller struct { @@ -108,7 +112,6 @@ func (c *Controller) GetSchedulerHandlers() map[string]http.Handler { // CollectSchedulerMetrics collects metrics of all schedulers. func (c *Controller) CollectSchedulerMetrics() { c.RLock() - defer c.RUnlock() for _, s := range c.schedulers { var allowScheduler float64 // If the scheduler is not allowed to schedule, it will disappear in Grafana panel. @@ -118,6 +121,15 @@ func (c *Controller) CollectSchedulerMetrics() { } schedulerStatusGauge.WithLabelValues(s.Scheduler.GetName(), "allow").Set(allowScheduler) } + c.RUnlock() + ruleMgr := c.cluster.GetRuleManager() + if ruleMgr == nil { + return + } + ruleCnt := ruleMgr.GetRulesCount() + groupCnt := ruleMgr.GetGroupsCount() + rulesCntStatusGauge.Set(float64(ruleCnt)) + groupsCntStatusGauge.Set(float64(groupCnt)) } func (c *Controller) isSchedulingHalted() bool { @@ -127,6 +139,10 @@ func (c *Controller) isSchedulingHalted() bool { // ResetSchedulerMetrics resets metrics of all schedulers. func (c *Controller) ResetSchedulerMetrics() { schedulerStatusGauge.Reset() + ruleStatusGauge.Reset() + // create in map again + rulesCntStatusGauge = ruleStatusGauge.WithLabelValues("rule_count") + groupsCntStatusGauge = ruleStatusGauge.WithLabelValues("group_count") } // AddSchedulerHandler adds the HTTP handler for a scheduler. diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 7a0e99efa5f..25a47a7fca9 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1236,6 +1236,11 @@ func (c *RaftCluster) GetRegions() []*core.RegionInfo { return c.core.GetRegions() } +// ValidRegion is used to decide if the region is valid. +func (c *RaftCluster) ValidRegion(region *metapb.Region) error { + return c.core.ValidRegion(region) +} + // GetTotalRegionCount returns total count of regions func (c *RaftCluster) GetTotalRegionCount() int { return c.core.GetTotalRegionCount() diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index c1da97363b5..a38ae86123f 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -50,7 +50,7 @@ func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSp return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs() } reqRegion := request.GetRegion() - err := c.ValidRequestRegion(reqRegion) + err := c.ValidRegion(reqRegion) if err != nil { return nil, err } @@ -90,23 +90,6 @@ func (c *RaftCluster) isSchedulingHalted() bool { return c.opt.IsSchedulingHalted() } -// ValidRequestRegion is used to decide if the region is valid. -func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error { - startKey := reqRegion.GetStartKey() - region := c.GetRegionByKey(startKey) - if region == nil { - return errors.Errorf("region not found, request region: %v", logutil.RedactStringer(core.RegionToHexMeta(reqRegion))) - } - // If the request epoch is less than current region epoch, then returns an error. - reqRegionEpoch := reqRegion.GetRegionEpoch() - regionEpoch := region.GetMeta().GetRegionEpoch() - if reqRegionEpoch.GetVersion() < regionEpoch.GetVersion() || - reqRegionEpoch.GetConfVer() < regionEpoch.GetConfVer() { - return errors.Errorf("invalid region epoch, request: %v, current: %v", reqRegionEpoch, regionEpoch) - } - return nil -} - // HandleAskBatchSplit handles the batch split request. func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) { if c.isSchedulingHalted() { @@ -117,7 +100,7 @@ func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (* } reqRegion := request.GetRegion() splitCount := request.GetSplitCount() - err := c.ValidRequestRegion(reqRegion) + err := c.ValidRegion(reqRegion) if err != nil { return nil, err } diff --git a/server/grpc_service.go b/server/grpc_service.go index 9ee9cad37df..14b3f72979d 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -69,6 +69,7 @@ var ( ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout") ErrNotFoundTSOAddr = status.Errorf(codes.NotFound, "not found tso address") + ErrNotFoundSchedulingAddr = status.Errorf(codes.NotFound, "not found scheduling address") ErrForwardTSOTimeout = status.Errorf(codes.DeadlineExceeded, "forward tso request timeout") ErrMaxCountTSOProxyRoutinesExceeded = status.Errorf(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded") ErrTSOProxyRecvFromClientTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy timeout when receiving from client; stream closed by server") @@ -1002,8 +1003,8 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear s.handleDamagedStore(request.GetStats()) storeHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds()) if s.IsAPIServiceMode() { - s.updateSchedulingClient(ctx) - if s.schedulingClient.Load() != nil { + forwardCli, _ := s.updateSchedulingClient(ctx) + if forwardCli != nil { req := &schedulingpb.StoreHeartbeatRequest{ Header: &schedulingpb.RequestHeader{ ClusterId: request.GetHeader().GetClusterId(), @@ -1011,7 +1012,7 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear }, Stats: request.GetStats(), } - if _, err := s.schedulingClient.Load().(*schedulingClient).getClient().StoreHeartbeat(ctx, req); err != nil { + if _, err := forwardCli.StoreHeartbeat(ctx, req); err != nil { // reset to let it be updated in the next request s.schedulingClient.Store(&schedulingClient{}) } @@ -1030,7 +1031,7 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear return resp, nil } -func (s *GrpcServer) updateSchedulingClient(ctx context.Context) { +func (s *GrpcServer) updateSchedulingClient(ctx context.Context) (schedulingpb.SchedulingClient, error) { forwardedHost, _ := s.GetServicePrimaryAddr(ctx, utils.SchedulingServiceName) pre := s.schedulingClient.Load() if forwardedHost != "" && ((pre == nil) || (pre != nil && forwardedHost != pre.(*schedulingClient).getPrimaryAddr())) { @@ -1038,11 +1039,14 @@ func (s *GrpcServer) updateSchedulingClient(ctx context.Context) { if err != nil { log.Error("get delegate client failed", zap.Error(err)) } - s.schedulingClient.Store(&schedulingClient{ + forwardCli := &schedulingClient{ client: schedulingpb.NewSchedulingClient(client), lastPrimary: forwardedHost, - }) + } + s.schedulingClient.Store(forwardCli) + return forwardCli.getClient(), nil } + return nil, ErrNotFoundSchedulingAddr } // bucketHeartbeatServer wraps PD_ReportBucketsServer to ensure when any error @@ -1622,6 +1626,26 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest // AskBatchSplit implements gRPC PDServer. func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) { + if s.IsAPIServiceMode() { + s.updateSchedulingClient(ctx) + if s.schedulingClient.Load() != nil { + req := &schedulingpb.AskBatchSplitRequest{ + Header: &schedulingpb.RequestHeader{ + ClusterId: request.GetHeader().GetClusterId(), + SenderId: request.GetHeader().GetSenderId(), + }, + Region: request.GetRegion(), + SplitCount: request.GetSplitCount(), + } + resp, err := s.schedulingClient.Load().(*schedulingClient).getClient().AskBatchSplit(ctx, req) + if err != nil { + // reset to let it be updated in the next request + s.schedulingClient.Store(&schedulingClient{}) + return s.convertAskSplitResponse(resp), err + } + return s.convertAskSplitResponse(resp), nil + } + } fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).AskBatchSplit(ctx, request) } @@ -1771,8 +1795,13 @@ func (s *GrpcServer) PutClusterConfig(ctx context.Context, request *pdpb.PutClus // ScatterRegion implements gRPC PDServer. func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterRegionRequest) (*pdpb.ScatterRegionResponse, error) { if s.IsAPIServiceMode() { - s.updateSchedulingClient(ctx) - if s.schedulingClient.Load() != nil { + forwardCli, err := s.updateSchedulingClient(ctx) + if err != nil { + return &pdpb.ScatterRegionResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + if forwardCli != nil { regionsID := request.GetRegionsId() if len(regionsID) == 0 { return &pdpb.ScatterRegionResponse{ @@ -1789,7 +1818,7 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg RetryLimit: request.GetRetryLimit(), SkipStoreLimit: request.GetSkipStoreLimit(), } - resp, err := s.schedulingClient.Load().(*schedulingClient).getClient().ScatterRegions(ctx, req) + resp, err := forwardCli.ScatterRegions(ctx, req) if err != nil { // reset to let it be updated in the next request s.schedulingClient.Store(&schedulingClient{}) @@ -1990,8 +2019,13 @@ func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb // GetOperator gets information about the operator belonging to the specify region. func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorRequest) (*pdpb.GetOperatorResponse, error) { if s.IsAPIServiceMode() { - s.updateSchedulingClient(ctx) - if s.schedulingClient.Load() != nil { + forwardCli, err := s.updateSchedulingClient(ctx) + if err != nil { + return &pdpb.GetOperatorResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + if forwardCli != nil { req := &schedulingpb.GetOperatorRequest{ Header: &schedulingpb.RequestHeader{ ClusterId: request.GetHeader().GetClusterId(), @@ -1999,7 +2033,7 @@ func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorR }, RegionId: request.GetRegionId(), } - resp, err := s.schedulingClient.Load().(*schedulingClient).getClient().GetOperator(ctx, req) + resp, err := forwardCli.GetOperator(ctx, req) if err != nil { // reset to let it be updated in the next request s.schedulingClient.Store(&schedulingClient{}) @@ -2136,6 +2170,13 @@ func (s *GrpcServer) convertOperatorResponse(resp *schedulingpb.GetOperatorRespo } } +func (s *GrpcServer) convertAskSplitResponse(resp *schedulingpb.AskBatchSplitResponse) *pdpb.AskBatchSplitResponse { + return &pdpb.AskBatchSplitResponse{ + Header: s.convertHeader(resp.GetHeader()), + Ids: resp.GetIds(), + } +} + // Only used for the TestLocalAllocatorLeaderChange. var mockLocalAllocatorLeaderChangeFlag = false @@ -2241,8 +2282,13 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest // SplitRegions split regions by the given split keys func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegionsRequest) (*pdpb.SplitRegionsResponse, error) { if s.IsAPIServiceMode() { - s.updateSchedulingClient(ctx) - if s.schedulingClient.Load() != nil { + forwardCli, err := s.updateSchedulingClient(ctx) + if err != nil { + return &pdpb.SplitRegionsResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + if forwardCli != nil { req := &schedulingpb.SplitRegionsRequest{ Header: &schedulingpb.RequestHeader{ ClusterId: request.GetHeader().GetClusterId(), @@ -2251,7 +2297,7 @@ func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegion SplitKeys: request.GetSplitKeys(), RetryLimit: request.GetRetryLimit(), } - resp, err := s.schedulingClient.Load().(*schedulingClient).getClient().SplitRegions(ctx, req) + resp, err := forwardCli.SplitRegions(ctx, req) if err != nil { // reset to let it be updated in the next request s.schedulingClient.Store(&schedulingClient{}) diff --git a/tests/integrations/client/go.mod b/tests/integrations/client/go.mod index ea933ae0996..e38efbeb438 100644 --- a/tests/integrations/client/go.mod +++ b/tests/integrations/client/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/docker/go-units v0.4.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a + github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/client/go.sum b/tests/integrations/client/go.sum index 785d69bbc33..c745c4fa518 100644 --- a/tests/integrations/client/go.sum +++ b/tests/integrations/client/go.sum @@ -402,8 +402,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c h1:5cpE29kMHjc8fv+mRiXbLTDfoZHiX5BTK6knVWlWvqk= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= diff --git a/tests/integrations/mcs/go.mod b/tests/integrations/mcs/go.mod index 536f34edbcf..000bfdc8312 100644 --- a/tests/integrations/mcs/go.mod +++ b/tests/integrations/mcs/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/docker/go-units v0.4.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a + github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/mcs/go.sum b/tests/integrations/mcs/go.sum index ad36f60c3ce..0da75329284 100644 --- a/tests/integrations/mcs/go.sum +++ b/tests/integrations/mcs/go.sum @@ -407,8 +407,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c h1:5cpE29kMHjc8fv+mRiXbLTDfoZHiX5BTK6knVWlWvqk= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 324c8e5cad5..85cf84361b4 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -265,18 +265,32 @@ func (suite *serverTestSuite) TestSchedulerSync() { api.MustDeleteScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName) checkEvictLeaderSchedulerExist(re, schedulersController, false) - // TODO: test more schedulers. - // Fixme: the following code will fail because the scheduler is not removed but not synced. - // checkDelete := func(schedulerName string) { - // re.NotNil(schedulersController.GetScheduler(schedulers.BalanceLeaderName) != nil) - // api.MustDeleteScheduler(re, suite.backendEndpoints, schedulers.BalanceLeaderName) - // testutil.Eventually(re, func() bool { - // return schedulersController.GetScheduler(schedulers.BalanceLeaderName) == nil - // }) - // } - // checkDelete(schedulers.BalanceLeaderName) - // checkDelete(schedulers.BalanceRegionName) - // checkDelete(schedulers.HotRegionName) + // The default scheduler could not be deleted, it could only be disabled. + defaultSchedulerNames := []string{ + schedulers.BalanceLeaderName, + schedulers.BalanceRegionName, + schedulers.BalanceWitnessName, + schedulers.HotRegionName, + schedulers.TransferWitnessLeaderName, + } + checkDisabled := func(name string, shouldDisabled bool) { + re.NotNil(schedulersController.GetScheduler(name), name) + testutil.Eventually(re, func() bool { + disabled, err := schedulersController.IsSchedulerDisabled(name) + re.NoError(err, name) + return disabled == shouldDisabled + }) + } + for _, name := range defaultSchedulerNames { + checkDisabled(name, false) + api.MustDeleteScheduler(re, suite.backendEndpoints, name) + checkDisabled(name, true) + } + for _, name := range defaultSchedulerNames { + checkDisabled(name, true) + api.MustAddScheduler(re, suite.backendEndpoints, name, nil) + checkDisabled(name, false) + } } func checkEvictLeaderSchedulerExist(re *require.Assertions, sc *schedulers.Controller, exist bool) { diff --git a/tests/integrations/tso/go.mod b/tests/integrations/tso/go.mod index f8a742362c9..f8a5cfac75f 100644 --- a/tests/integrations/tso/go.mod +++ b/tests/integrations/tso/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c - github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a + github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c github.com/stretchr/testify v1.8.4 github.com/tikv/pd v0.0.0-00010101000000-000000000000 github.com/tikv/pd/client v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/tso/go.sum b/tests/integrations/tso/go.sum index 7ae1d41a823..63327985f0d 100644 --- a/tests/integrations/tso/go.sum +++ b/tests/integrations/tso/go.sum @@ -401,8 +401,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ= github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c h1:5cpE29kMHjc8fv+mRiXbLTDfoZHiX5BTK6knVWlWvqk= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index c5b118a9f5e..b3d9f356ad1 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -407,20 +407,23 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-leader-scheduler"}, nil) re.Contains(echo, "Success!") - // test evict-slow-trend scheduler config - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-slow-trend-scheduler"}, nil) - re.Contains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) - re.Contains(echo, "evict-slow-trend-scheduler") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-slow-trend-scheduler", "set", "recovery-duration", "100"}, nil) - re.Contains(echo, "Success!") - conf = make(map[string]interface{}) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-slow-trend-scheduler", "show"}, &conf) - re.Equal(100., conf["recovery-duration"]) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-slow-trend-scheduler"}, nil) - re.Contains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) - re.NotContains(echo, "evict-slow-trend-scheduler") + // test evict-slow-store && evict-slow-trend schedulers config + evictSlownessSchedulers := []string{"evict-slow-store-scheduler", "evict-slow-trend-scheduler"} + for _, schedulerName := range evictSlownessSchedulers { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", schedulerName}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + re.Contains(echo, schedulerName) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "set", "recovery-duration", "100"}, nil) + re.Contains(echo, "Success!") + conf = make(map[string]interface{}) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "show"}, &conf) + re.Equal(100., conf["recovery-duration"]) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", schedulerName}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + re.NotContains(echo, schedulerName) + } // test show scheduler with paused and disabled status. checkSchedulerWithStatusCommand := func(status string, expected []string) { diff --git a/tests/server/cluster/cluster_work_test.go b/tests/server/cluster/cluster_work_test.go index ef09e522305..eabecf8e29b 100644 --- a/tests/server/cluster/cluster_work_test.go +++ b/tests/server/cluster/cluster_work_test.go @@ -64,13 +64,13 @@ func TestValidRequestRegion(t *testing.T) { err = rc.HandleRegionHeartbeat(r1) re.NoError(err) r2 := &metapb.Region{Id: 2, StartKey: []byte("a"), EndKey: []byte("b")} - re.Error(rc.ValidRequestRegion(r2)) + re.Error(rc.ValidRegion(r2)) r3 := &metapb.Region{Id: 1, StartKey: []byte(""), EndKey: []byte("a"), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}} - re.Error(rc.ValidRequestRegion(r3)) + re.Error(rc.ValidRegion(r3)) r4 := &metapb.Region{Id: 1, StartKey: []byte(""), EndKey: []byte("a"), RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 1}} - re.Error(rc.ValidRequestRegion(r4)) + re.Error(rc.ValidRegion(r4)) r5 := &metapb.Region{Id: 1, StartKey: []byte(""), EndKey: []byte("a"), RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 2}} - re.NoError(rc.ValidRequestRegion(r5)) + re.NoError(rc.ValidRegion(r5)) rc.Stop() } diff --git a/tools/pd-api-bench/go.mod b/tools/pd-api-bench/go.mod index 94aa1ff0dc9..e6e896a0797 100644 --- a/tools/pd-api-bench/go.mod +++ b/tools/pd-api-bench/go.mod @@ -69,7 +69,7 @@ require ( github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 // indirect - github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a // indirect + github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c // indirect github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.11.1 // indirect diff --git a/tools/pd-api-bench/go.sum b/tools/pd-api-bench/go.sum index aaa400df5fd..f40a4fe2f5a 100644 --- a/tools/pd-api-bench/go.sum +++ b/tools/pd-api-bench/go.sum @@ -238,8 +238,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c h1:5cpE29kMHjc8fv+mRiXbLTDfoZHiX5BTK6knVWlWvqk= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index 40bf9a07a4d..4349735f06d 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -499,6 +499,7 @@ func NewConfigSchedulerCommand() *cobra.Command { newConfigGrantHotRegionCommand(), newConfigBalanceLeaderCommand(), newSplitBucketCommand(), + newConfigEvictSlowStoreCommand(), newConfigEvictSlowTrendCommand(), ) return c @@ -776,6 +777,25 @@ func setShuffleRegionSchedulerRolesCommandFunc(cmd *cobra.Command, args []string cmd.Println("Success!") } +func newConfigEvictSlowStoreCommand() *cobra.Command { + c := &cobra.Command{ + Use: "evict-slow-store-scheduler", + Short: "evict-slow-store-scheduler config", + Run: listSchedulerConfigCommandFunc, + } + + c.AddCommand(&cobra.Command{ + Use: "show", + Short: "list the config item", + Run: listSchedulerConfigCommandFunc, + }, &cobra.Command{ + Use: "set ", + Short: "set the config item", + Run: func(cmd *cobra.Command, args []string) { postSchedulerConfigCommandFunc(cmd, c.Name(), args) }, + }) + return c +} + func newConfigEvictSlowTrendCommand() *cobra.Command { c := &cobra.Command{ Use: "evict-slow-trend-scheduler",