diff --git a/go.mod b/go.mod index 73f3d783db2..86f56089347 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a + github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 diff --git a/go.sum b/go.sum index 6f73b806860..9392644f181 100644 --- a/go.sum +++ b/go.sum @@ -439,8 +439,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c h1:5cpE29kMHjc8fv+mRiXbLTDfoZHiX5BTK6knVWlWvqk= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index c811303d1c4..809244771b7 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -1139,6 +1139,111 @@ "timeFrom": null, "timeShift": null }, + { + "bars": false, + "cacheTimeout": null, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The current peer count of the cluster", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 6, + "w": 4, + "x": 16, + "y": 13 + }, + "hiddenSeries": false, + "id": 22, + "interval": null, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "maxDataPoints": 100, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.10", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(pd_rule_manager_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}) by (type)", + "format": "time_series", + "interval": "", + "intervalFactor": 2, + "legendFormat": "{{type}}", + "refId": "A", + "step": 4 + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Placement Rules Status", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:192", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:193", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "collapsed": true, "gridPos": { diff --git a/pkg/core/region.go b/pkg/core/region.go index 6631005a608..6875844b7a6 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -1675,6 +1675,23 @@ func (r *RegionsInfo) GetAverageRegionSize() int64 { return r.tree.TotalSize() / int64(r.tree.length()) } +// ValidRegion is used to decide if the region is valid. +func (r *RegionsInfo) ValidRegion(region *metapb.Region) error { + startKey := region.GetStartKey() + currnetRegion := r.GetRegionByKey(startKey) + if currnetRegion == nil { + return errors.Errorf("region not found, request region: %v", logutil.RedactStringer(RegionToHexMeta(region))) + } + // If the request epoch is less than current region epoch, then returns an error. + regionEpoch := region.GetRegionEpoch() + currnetEpoch := currnetRegion.GetMeta().GetRegionEpoch() + if regionEpoch.GetVersion() < currnetEpoch.GetVersion() || + regionEpoch.GetConfVer() < currnetEpoch.GetConfVer() { + return errors.Errorf("invalid region epoch, request: %v, current: %v", regionEpoch, currnetEpoch) + } + return nil +} + // DiffRegionPeersInfo return the difference of peers info between two RegionInfo func DiffRegionPeersInfo(origin *RegionInfo, other *RegionInfo) string { var ret []string diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index d462a9a58b5..4f9caca41e6 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -18,6 +18,7 @@ import ( "fmt" "os" "path/filepath" + "reflect" "strings" "sync/atomic" "time" @@ -232,6 +233,14 @@ func (o *PersistConfig) getSchedulersUpdatingNotifier() chan<- struct{} { return v.(chan<- struct{}) } +func (o *PersistConfig) tryNotifySchedulersUpdating() { + notifier := o.getSchedulersUpdatingNotifier() + if notifier == nil { + return + } + notifier <- struct{}{} +} + // GetClusterVersion returns the cluster version. func (o *PersistConfig) GetClusterVersion() *semver.Version { return (*semver.Version)(atomic.LoadPointer(&o.clusterVersion)) @@ -251,11 +260,10 @@ func (o *PersistConfig) GetScheduleConfig() *sc.ScheduleConfig { func (o *PersistConfig) SetScheduleConfig(cfg *sc.ScheduleConfig) { old := o.GetScheduleConfig() o.schedule.Store(cfg) - // The coordinator is not aware of the underlying scheduler config changes, however, it - // should react on the scheduler number changes to handle the add/remove scheduler events. - if notifier := o.getSchedulersUpdatingNotifier(); notifier != nil && - len(old.Schedulers) != len(cfg.Schedulers) { - notifier <- struct{}{} + // The coordinator is not aware of the underlying scheduler config changes, + // we should notify it to update the schedulers proactively. + if !reflect.DeepEqual(old.Schedulers, cfg.Schedulers) { + o.tryNotifySchedulersUpdating() } } @@ -650,6 +658,11 @@ func (o *PersistConfig) IsRaftKV2() bool { return o.GetStoreConfig().IsRaftKV2() } +// IsTikvRegionSplitEnabled returns whether tikv split region is disabled. +func (o *PersistConfig) IsTikvRegionSplitEnabled() bool { + return o.GetScheduleConfig().EnableTiKVSplitRegion +} + // TODO: implement the following methods // AddSchedulerCfg adds the scheduler configurations. diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index e2e2c8af32d..8e47e7380f9 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -22,13 +22,16 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/schedulingpb" "github.com/pingcap/log" bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/registry" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/versioninfo" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -254,6 +257,74 @@ func (s *Service) GetOperator(ctx context.Context, request *schedulingpb.GetOper }, nil } +// AskBatchSplit implements gRPC PDServer. +func (s *Service) AskBatchSplit(ctx context.Context, request *schedulingpb.AskBatchSplitRequest) (*schedulingpb.AskBatchSplitResponse, error) { + c := s.GetCluster() + if c == nil { + return &schedulingpb.AskBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil + } + + if request.GetRegion() == nil { + return &schedulingpb.AskBatchSplitResponse{ + Header: s.wrapErrorToHeader(schedulingpb.ErrorType_UNKNOWN, + "missing region for split"), + }, nil + } + + if c.persistConfig.IsSchedulingHalted() { + return nil, errs.ErrSchedulingIsHalted.FastGenByArgs() + } + if !c.persistConfig.IsTikvRegionSplitEnabled() { + return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs() + } + reqRegion := request.GetRegion() + splitCount := request.GetSplitCount() + err := c.ValidRegion(reqRegion) + if err != nil { + return nil, err + } + splitIDs := make([]*pdpb.SplitID, 0, splitCount) + recordRegions := make([]uint64, 0, splitCount+1) + + for i := 0; i < int(splitCount); i++ { + newRegionID, err := c.AllocID() + if err != nil { + return nil, errs.ErrSchedulerNotFound.FastGenByArgs() + } + + peerIDs := make([]uint64, len(request.Region.Peers)) + for i := 0; i < len(peerIDs); i++ { + if peerIDs[i], err = c.AllocID(); err != nil { + return nil, err + } + } + + recordRegions = append(recordRegions, newRegionID) + splitIDs = append(splitIDs, &pdpb.SplitID{ + NewRegionId: newRegionID, + NewPeerIds: peerIDs, + }) + + log.Info("alloc ids for region split", zap.Uint64("region-id", newRegionID), zap.Uint64s("peer-ids", peerIDs)) + } + + recordRegions = append(recordRegions, reqRegion.GetId()) + if versioninfo.IsFeatureSupported(c.persistConfig.GetClusterVersion(), versioninfo.RegionMerge) { + // Disable merge the regions in a period of time. + c.GetCoordinator().GetMergeChecker().RecordRegionSplit(recordRegions) + } + + // If region splits during the scheduling process, regions with abnormal + // status may be left, and these regions need to be checked with higher + // priority. + c.GetCoordinator().GetCheckerController().AddSuspectRegions(recordRegions...) + + return &schedulingpb.AskBatchSplitResponse{ + Header: s.header(), + Ids: splitIDs, + }, nil +} + // RegisterGRPCService registers the service to gRPC server. func (s *Service) RegisterGRPCService(g *grpc.Server) { schedulingpb.RegisterSchedulingServer(g, s) diff --git a/pkg/replication/replication_mode.go b/pkg/replication/replication_mode.go index faa06822127..30b34e4596a 100644 --- a/pkg/replication/replication_mode.go +++ b/pkg/replication/replication_mode.go @@ -371,14 +371,6 @@ func (m *ModeManager) Run(ctx context.Context) { wg.Wait() } -func storeIDs(stores []*core.StoreInfo) []uint64 { - ids := make([]uint64, len(stores)) - for i, s := range stores { - ids[i] = s.GetID() - } - return ids -} - func minimalUpVoters(rule *placement.Rule, upStores, downStores []*core.StoreInfo) int { if rule.Role == placement.Learner { return 0 @@ -411,7 +403,7 @@ func (m *ModeManager) tickUpdateState() { drTickCounter.Inc() - stores := m.checkStoreStatus() + stores, storeIDs := m.checkStoreStatus() var primaryHasVoter, drHasVoter bool var totalVoter, totalUpVoter int @@ -440,10 +432,10 @@ func (m *ModeManager) tickUpdateState() { hasMajority := totalUpVoter*2 > totalVoter log.Debug("replication store status", - zap.Uint64s("up-primary", storeIDs(stores[primaryUp])), - zap.Uint64s("up-dr", storeIDs(stores[drUp])), - zap.Uint64s("down-primary", storeIDs(stores[primaryDown])), - zap.Uint64s("down-dr", storeIDs(stores[drDown])), + zap.Uint64s("up-primary", storeIDs[primaryUp]), + zap.Uint64s("up-dr", storeIDs[drUp]), + zap.Uint64s("down-primary", storeIDs[primaryDown]), + zap.Uint64s("down-dr", storeIDs[drDown]), zap.Bool("can-sync", canSync), zap.Bool("has-majority", hasMajority), ) @@ -470,31 +462,31 @@ func (m *ModeManager) tickUpdateState() { case drStateSync: // If hasMajority is false, the cluster is always unavailable. Switch to async won't help. if !canSync && hasMajority { - m.drSwitchToAsyncWait(storeIDs(stores[primaryUp])) + m.drSwitchToAsyncWait(storeIDs[primaryUp]) } case drStateAsyncWait: if canSync { m.drSwitchToSync() break } - if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, storeIDs(stores[primaryUp])) { - m.drSwitchToAsyncWait(storeIDs(stores[primaryUp])) + if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, storeIDs[primaryUp]) { + m.drSwitchToAsyncWait(storeIDs[primaryUp]) break } - if m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) { - m.drSwitchToAsync(storeIDs(stores[primaryUp])) + if m.drCheckStoreStateUpdated(storeIDs[primaryUp]) { + m.drSwitchToAsync(storeIDs[primaryUp]) } case drStateAsync: if canSync { m.drSwitchToSyncRecover() break } - if !reflect.DeepEqual(m.drGetAvailableStores(), stores[primaryUp]) && m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) { - m.drSwitchToAsync(storeIDs(stores[primaryUp])) + if !reflect.DeepEqual(m.drGetAvailableStores(), storeIDs[primaryUp]) && m.drCheckStoreStateUpdated(storeIDs[primaryUp]) { + m.drSwitchToAsync(storeIDs[primaryUp]) } case drStateSyncRecover: if !canSync && hasMajority { - m.drSwitchToAsync(storeIDs(stores[primaryUp])) + m.drSwitchToAsync(storeIDs[primaryUp]) } else { m.updateProgress() progress := m.estimateProgress() @@ -569,39 +561,40 @@ const ( storeStatusTypeCount ) -func (m *ModeManager) checkStoreStatus() [][]*core.StoreInfo { +func (m *ModeManager) checkStoreStatus() ([][]*core.StoreInfo, [][]uint64) { m.RLock() defer m.RUnlock() - stores := make([][]*core.StoreInfo, storeStatusTypeCount) + stores, storeIDs := make([][]*core.StoreInfo, storeStatusTypeCount), make([][]uint64, storeStatusTypeCount) for _, s := range m.cluster.GetStores() { if s.IsRemoved() { continue } - // learner peers do not participate in major commit or vote, so it should not count in primary/dr as a normal store. - if s.GetRegionCount() == s.GetLearnerCount() { - continue - } down := s.DownTime() >= m.config.DRAutoSync.WaitStoreTimeout.Duration labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey) if labelValue == m.config.DRAutoSync.Primary { if down { stores[primaryDown] = append(stores[primaryDown], s) + storeIDs[primaryDown] = append(storeIDs[primaryDown], s.GetID()) } else { stores[primaryUp] = append(stores[primaryUp], s) + storeIDs[primaryUp] = append(storeIDs[primaryUp], s.GetID()) } } if labelValue == m.config.DRAutoSync.DR { if down { stores[drDown] = append(stores[drDown], s) + storeIDs[drDown] = append(storeIDs[drDown], s.GetID()) } else { stores[drUp] = append(stores[drUp], s) + storeIDs[drUp] = append(storeIDs[drUp], s.GetID()) } } } for i := range stores { sort.Slice(stores[i], func(a, b int) bool { return stores[i][a].GetID() < stores[i][b].GetID() }) + sort.Slice(storeIDs[i], func(a, b int) bool { return storeIDs[i][a] < storeIDs[i][b] }) } - return stores + return stores, storeIDs } // UpdateStoreDRStatus saves the dr-autosync status of a store. diff --git a/pkg/replication/replication_mode_test.go b/pkg/replication/replication_mode_test.go index bb9b6559cf6..e01fb7a0b9a 100644 --- a/pkg/replication/replication_mode_test.go +++ b/pkg/replication/replication_mode_test.go @@ -245,8 +245,8 @@ func TestStateSwitch(t *testing.T) { rep.tickUpdateState() re.Equal(drStateSync, rep.drGetState()) - // once the voter node down, even learner node up, swith to async state. - setStoreState(cluster, "up", "up", "up", "up", "down", "up") + // once zone2 down, swith to async state. + setStoreState(cluster, "up", "up", "up", "up", "down", "down") rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) @@ -264,18 +264,18 @@ func TestStateSwitch(t *testing.T) { re.False(rep.GetReplicationStatus().GetDrAutoSync().GetPauseRegionSplit()) // async_wait -> async_wait - setStoreState(cluster, "up", "up", "up", "up", "down", "up") + setStoreState(cluster, "up", "up", "up", "up", "down", "down") rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) assertStateIDUpdate() rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) - setStoreState(cluster, "down", "up", "up", "up", "down", "up") + setStoreState(cluster, "down", "up", "up", "up", "down", "down") rep.tickUpdateState() assertStateIDUpdate() rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[2,3,4]}`, stateID), replicator.lastData[1]) - setStoreState(cluster, "up", "down", "up", "up", "down", "up") + setStoreState(cluster, "up", "down", "up", "up", "down", "down") rep.tickUpdateState() assertStateIDUpdate() rep.tickReplicateStatus() @@ -294,7 +294,7 @@ func TestStateSwitch(t *testing.T) { re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1]) // async -> async - setStoreState(cluster, "up", "up", "up", "up", "down", "up") + setStoreState(cluster, "up", "up", "up", "up", "down", "down") rep.tickUpdateState() // store 2 won't be available before it syncs status. rep.tickReplicateStatus() @@ -319,14 +319,14 @@ func TestStateSwitch(t *testing.T) { // sync_recover -> async rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) - setStoreState(cluster, "up", "up", "up", "up", "down", "up") + setStoreState(cluster, "up", "up", "up", "up", "down", "down") rep.tickUpdateState() re.Equal(drStateAsync, rep.drGetState()) assertStateIDUpdate() // lost majority, does not switch to async. rep.drSwitchToSyncRecover() assertStateIDUpdate() - setStoreState(cluster, "down", "down", "up", "up", "down", "up") + setStoreState(cluster, "down", "down", "up", "up", "down", "down") rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) @@ -636,6 +636,8 @@ func TestComplexPlacementRules(t *testing.T) { setStoreState(cluster, "up", "up", "up", "up", "up", "up", "up", "down", "up", "down") rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) + rep.tickReplicateStatus() + re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4,5,6]}`, rep.drAutoSync.StateID), replicator.lastData[1]) // reset to sync setStoreState(cluster, "up", "up", "up", "up", "up", "up", "up", "up", "up", "up") @@ -695,6 +697,47 @@ func TestComplexPlacementRules2(t *testing.T) { setStoreState(cluster, "up", "up", "up", "up", "down", "down", "up") rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) + rep.tickReplicateStatus() + re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1]) +} + +func TestComplexPlacementRules3(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + store := storage.NewStorageWithMemoryBackend() + conf := config.ReplicationModeConfig{ReplicationMode: modeDRAutoSync, DRAutoSync: config.DRAutoSyncReplicationConfig{ + LabelKey: "zone", + Primary: "zone1", + DR: "zone2", + WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, + }} + cluster := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions()) + replicator := newMockReplicator([]uint64{1}) + rep, err := NewReplicationModeManager(conf, store, cluster, replicator) + re.NoError(err) + cluster.GetRuleManager().SetAllGroupBundles( + genPlacementRuleConfig([]ruleConfig{ + {key: "logic", value: "logic1", role: placement.Voter, count: 2}, + {key: "logic", value: "logic2", role: placement.Learner, count: 1}, + {key: "logic", value: "logic3", role: placement.Voter, count: 1}, + }), true) + + cluster.AddLabelsStore(1, 1, map[string]string{"zone": "zone1", "logic": "logic1"}) + cluster.AddLabelsStore(2, 1, map[string]string{"zone": "zone1", "logic": "logic1"}) + cluster.AddLabelsStore(3, 1, map[string]string{"zone": "zone1", "logic": "logic2"}) + cluster.AddLabelsStore(4, 1, map[string]string{"zone": "zone1", "logic": "logic2"}) + cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2", "logic": "logic3"}) + + // initial state is sync + re.Equal(drStateSync, rep.drGetState()) + + // zone2 down, switch state, available stores should contain logic2 (learner) + setStoreState(cluster, "up", "up", "up", "up", "down") + rep.tickUpdateState() + re.Equal(drStateAsyncWait, rep.drGetState()) + rep.tickReplicateStatus() + re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1]) } func genRegions(cluster *mockcluster.Cluster, stateID uint64, n int) []*core.RegionInfo { diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index f35bd6d4de3..8fb9ec8b286 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -458,13 +458,16 @@ func (c *Coordinator) InitSchedulers(needRun bool) { log.Error("can not create scheduler with independent configuration", zap.String("scheduler-name", name), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) continue } - log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName())) if needRun { + log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName())) if err = c.schedulers.AddScheduler(s); err != nil { log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) } - } else if err = c.schedulers.AddSchedulerHandler(s); err != nil { - log.Error("can not add scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) + } else { + log.Info("create scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName())) + if err = c.schedulers.AddSchedulerHandler(s); err != nil { + log.Error("can not add scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) + } } } @@ -484,8 +487,8 @@ func (c *Coordinator) InitSchedulers(needRun bool) { continue } - log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args)) if needRun { + log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args)) if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) } else { @@ -493,8 +496,14 @@ func (c *Coordinator) InitSchedulers(needRun bool) { scheduleCfg.Schedulers[k] = schedulerCfg k++ } - } else if err = c.schedulers.AddSchedulerHandler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { - log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) + } else { + log.Info("create scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args)) + if err = c.schedulers.AddSchedulerHandler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { + log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) + } else { + scheduleCfg.Schedulers[k] = schedulerCfg + k++ + } } } diff --git a/pkg/schedule/placement/rule_manager.go b/pkg/schedule/placement/rule_manager.go index bdca4cc1b19..a7e169b74aa 100644 --- a/pkg/schedule/placement/rule_manager.go +++ b/pkg/schedule/placement/rule_manager.go @@ -321,6 +321,20 @@ func (m *RuleManager) GetAllRules() []*Rule { return rules } +// GetRulesCount returns the number of rules. +func (m *RuleManager) GetRulesCount() int { + m.RLock() + defer m.RUnlock() + return len(m.ruleConfig.rules) +} + +// GetGroupsCount returns the number of rule groups. +func (m *RuleManager) GetGroupsCount() int { + m.RLock() + defer m.RUnlock() + return len(m.ruleConfig.groups) +} + // GetRulesByGroup returns sorted rules of a group. func (m *RuleManager) GetRulesByGroup(group string) []*Rule { m.RLock() diff --git a/pkg/schedule/placement/rule_manager_test.go b/pkg/schedule/placement/rule_manager_test.go index a6454337aa8..dad50a2d881 100644 --- a/pkg/schedule/placement/rule_manager_test.go +++ b/pkg/schedule/placement/rule_manager_test.go @@ -163,6 +163,8 @@ func TestSaveLoad(t *testing.T) { re.Equal(rules[0].String(), m2.GetRule("pd", "default").String()) re.Equal(rules[1].String(), m2.GetRule("foo", "baz").String()) re.Equal(rules[2].String(), m2.GetRule("foo", "bar").String()) + re.Equal(manager.GetRulesCount(), 3) + re.Equal(manager.GetGroupsCount(), 2) } func TestSetAfterGet(t *testing.T) { diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index a6665c3e5e7..cc1b16300c5 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -15,6 +15,11 @@ package schedulers import ( + "net/http" + "sync/atomic" + "time" + + "github.com/gorilla/mux" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" @@ -23,6 +28,8 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/unrolled/render" "go.uber.org/zap" ) @@ -40,8 +47,27 @@ const ( var evictSlowStoreCounter = schedulerCounter.WithLabelValues(EvictSlowStoreName, "schedule") type evictSlowStoreSchedulerConfig struct { - storage endpoint.ConfigStorage - EvictedStores []uint64 `json:"evict-stores"` + storage endpoint.ConfigStorage + // Last timestamp of the chosen slow store for eviction. + lastSlowStoreCaptureTS time.Time + // Duration gap for recovering the candidate, unit: s. + RecoveryDurationGap uint64 `json:"recovery-duration"` + EvictedStores []uint64 `json:"evict-stores"` +} + +func initEvictSlowStoreSchedulerConfig(storage endpoint.ConfigStorage) *evictSlowStoreSchedulerConfig { + return &evictSlowStoreSchedulerConfig{ + storage: storage, + lastSlowStoreCaptureTS: time.Time{}, + RecoveryDurationGap: defaultRecoveryDurationGap, + EvictedStores: make([]uint64, 0), + } +} + +func (conf *evictSlowStoreSchedulerConfig) Clone() *evictSlowStoreSchedulerConfig { + return &evictSlowStoreSchedulerConfig{ + RecoveryDurationGap: atomic.LoadUint64(&conf.RecoveryDurationGap), + } } func (conf *evictSlowStoreSchedulerConfig) Persist() error { @@ -78,8 +104,18 @@ func (conf *evictSlowStoreSchedulerConfig) evictStore() uint64 { return conf.EvictedStores[0] } +// readyForRecovery checks whether the last cpatured candidate is ready for recovery. +func (conf *evictSlowStoreSchedulerConfig) readyForRecovery() bool { + recoveryDurationGap := atomic.LoadUint64(&conf.RecoveryDurationGap) + failpoint.Inject("transientRecoveryGap", func() { + recoveryDurationGap = 0 + }) + return uint64(time.Since(conf.lastSlowStoreCaptureTS).Seconds()) >= recoveryDurationGap +} + func (conf *evictSlowStoreSchedulerConfig) setStoreAndPersist(id uint64) error { conf.EvictedStores = []uint64{id} + conf.lastSlowStoreCaptureTS = time.Now() return conf.Persist() } @@ -87,14 +123,58 @@ func (conf *evictSlowStoreSchedulerConfig) clearAndPersist() (oldID uint64, err oldID = conf.evictStore() if oldID > 0 { conf.EvictedStores = []uint64{} + conf.lastSlowStoreCaptureTS = time.Time{} err = conf.Persist() } return } +type evictSlowStoreHandler struct { + rd *render.Render + config *evictSlowStoreSchedulerConfig +} + +func newEvictSlowStoreHandler(config *evictSlowStoreSchedulerConfig) http.Handler { + h := &evictSlowStoreHandler{ + config: config, + rd: render.New(render.Options{IndentJSON: true}), + } + router := mux.NewRouter() + router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) + return router +} + +func (handler *evictSlowStoreHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { + var input map[string]interface{} + if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { + return + } + recoveryDurationGapFloat, ok := input["recovery-duration"].(float64) + if !ok { + handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error()) + return + } + recoveryDurationGap := (uint64)(recoveryDurationGapFloat) + prevRecoveryDurationGap := atomic.LoadUint64(&handler.config.RecoveryDurationGap) + atomic.StoreUint64(&handler.config.RecoveryDurationGap, recoveryDurationGap) + log.Info("evict-slow-store-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap)) + handler.rd.JSON(w, http.StatusOK, nil) +} + +func (handler *evictSlowStoreHandler) ListConfig(w http.ResponseWriter, r *http.Request) { + conf := handler.config.Clone() + handler.rd.JSON(w, http.StatusOK, conf) +} + type evictSlowStoreScheduler struct { *BaseScheduler - conf *evictSlowStoreSchedulerConfig + conf *evictSlowStoreSchedulerConfig + handler http.Handler +} + +func (s *evictSlowStoreScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) } func (s *evictSlowStoreScheduler) GetName() string { @@ -168,7 +248,7 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, dryRun // slow node next time. log.Info("slow store has been removed", zap.Uint64("store-id", store.GetID())) - } else if store.GetSlowScore() <= slowStoreRecoverThreshold { + } else if store.GetSlowScore() <= slowStoreRecoverThreshold && s.conf.readyForRecovery() { log.Info("slow store has been recovered", zap.Uint64("store-id", store.GetID())) } else { @@ -211,11 +291,10 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, dryRun // newEvictSlowStoreScheduler creates a scheduler that detects and evicts slow stores. func newEvictSlowStoreScheduler(opController *operator.Controller, conf *evictSlowStoreSchedulerConfig) Scheduler { - base := NewBaseScheduler(opController) - - s := &evictSlowStoreScheduler{ - BaseScheduler: base, + handler := newEvictSlowStoreHandler(conf) + return &evictSlowStoreScheduler{ + BaseScheduler: NewBaseScheduler(opController), conf: conf, + handler: handler, } - return s } diff --git a/pkg/schedule/schedulers/evict_slow_store_test.go b/pkg/schedule/schedulers/evict_slow_store_test.go index 0b0c1d9ad39..813d17ae541 100644 --- a/pkg/schedule/schedulers/evict_slow_store_test.go +++ b/pkg/schedule/schedulers/evict_slow_store_test.go @@ -67,6 +67,7 @@ func (suite *evictSlowStoreTestSuite) TearDownTest() { } func (suite *evictSlowStoreTestSuite) TestEvictSlowStore() { + suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/schedulers/transientRecoveryGap", "return(true)")) storeInfo := suite.tc.GetStore(1) newStoreInfo := storeInfo.Clone(func(store *core.StoreInfo) { store.GetStoreStats().SlowScore = 100 @@ -113,6 +114,8 @@ func (suite *evictSlowStoreTestSuite) TestEvictSlowStore() { suite.NoError(err) suite.Equal(es2.conf.EvictedStores, persistValue.EvictedStores) suite.Zero(persistValue.evictStore()) + suite.True(persistValue.readyForRecovery()) + suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/schedulers/transientRecoveryGap")) } func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePrepare() { @@ -124,6 +127,7 @@ func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePrepare() { es2.conf.setStoreAndPersist(1) suite.Equal(uint64(1), es2.conf.evictStore()) + suite.False(es2.conf.readyForRecovery()) // prepare with evict store. suite.es.Prepare(suite.tc) } diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index dde10610643..d45602b90e1 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -159,7 +159,7 @@ func schedulersRegister() { }) RegisterScheduler(EvictSlowStoreType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { - conf := &evictSlowStoreSchedulerConfig{storage: storage, EvictedStores: make([]uint64, 0)} + conf := initEvictSlowStoreSchedulerConfig(storage) if err := decoder(conf); err != nil { return nil, err } diff --git a/pkg/schedule/schedulers/metrics.go b/pkg/schedule/schedulers/metrics.go index 2052bc923af..34e4606a7ce 100644 --- a/pkg/schedule/schedulers/metrics.go +++ b/pkg/schedule/schedulers/metrics.go @@ -134,10 +134,19 @@ var ( Name: "hot_pending_sum", Help: "Pending influence sum of store in hot region scheduler.", }, []string{"store", "rw", "dim"}) + + ruleStatusGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "rule_manager", + Name: "status", + Help: "Status of the rule.", + }, []string{"type"}) ) func init() { prometheus.MustRegister(schedulerStatusGauge) + prometheus.MustRegister(ruleStatusGauge) prometheus.MustRegister(schedulerCounter) prometheus.MustRegister(balanceWitnessCounter) prometheus.MustRegister(hotSchedulerResultCounter) diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index d58a78ca82f..0f2264392aa 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -36,7 +36,11 @@ import ( const maxScheduleRetries = 10 -var denySchedulersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("schedulers", "deny") +var ( + denySchedulersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("schedulers", "deny") + rulesCntStatusGauge = ruleStatusGauge.WithLabelValues("rule_count") + groupsCntStatusGauge = ruleStatusGauge.WithLabelValues("group_count") +) // Controller is used to manage all schedulers. type Controller struct { @@ -108,7 +112,6 @@ func (c *Controller) GetSchedulerHandlers() map[string]http.Handler { // CollectSchedulerMetrics collects metrics of all schedulers. func (c *Controller) CollectSchedulerMetrics() { c.RLock() - defer c.RUnlock() for _, s := range c.schedulers { var allowScheduler float64 // If the scheduler is not allowed to schedule, it will disappear in Grafana panel. @@ -118,6 +121,15 @@ func (c *Controller) CollectSchedulerMetrics() { } schedulerStatusGauge.WithLabelValues(s.Scheduler.GetName(), "allow").Set(allowScheduler) } + c.RUnlock() + ruleMgr := c.cluster.GetRuleManager() + if ruleMgr == nil { + return + } + ruleCnt := ruleMgr.GetRulesCount() + groupCnt := ruleMgr.GetGroupsCount() + rulesCntStatusGauge.Set(float64(ruleCnt)) + groupsCntStatusGauge.Set(float64(groupCnt)) } func (c *Controller) isSchedulingHalted() bool { @@ -127,6 +139,10 @@ func (c *Controller) isSchedulingHalted() bool { // ResetSchedulerMetrics resets metrics of all schedulers. func (c *Controller) ResetSchedulerMetrics() { schedulerStatusGauge.Reset() + ruleStatusGauge.Reset() + // create in map again + rulesCntStatusGauge = ruleStatusGauge.WithLabelValues("rule_count") + groupsCntStatusGauge = ruleStatusGauge.WithLabelValues("group_count") } // AddSchedulerHandler adds the HTTP handler for a scheduler. diff --git a/pkg/window/counter.go b/pkg/window/counter.go new file mode 100644 index 00000000000..8eaf164b7c0 --- /dev/null +++ b/pkg/window/counter.go @@ -0,0 +1,111 @@ +// The MIT License (MIT) +// Copyright (c) 2022 go-kratos Project Authors. +// +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package window + +import ( + "time" +) + +// Metric is a sample interface. +// Implementations of Metrics in metric package are Counter, Gauge, +// PointGauge, RollingCounter and RollingGauge. +type Metric interface { + // Add adds the given value to the counter. + Add(int64) + // Value gets the current value. + // If the metric's type is PointGauge, RollingCounter, RollingGauge, + // it returns the sum value within the window. + Value() int64 +} + +// Aggregation contains some common aggregation function. +// Each aggregation can compute summary statistics of window. +type Aggregation interface { + // Min finds the min value within the window. + Min() float64 + // Max finds the max value within the window. + Max() float64 + // Avg computes average value within the window. + Avg() float64 + // Sum computes sum value within the window. + Sum() float64 +} + +// RollingCounter represents a ring window based on time duration. +// e.g. [[1], [3], [5]] +type RollingCounter interface { + Metric + Aggregation + + Timespan() int + // Reduce applies the reduction function to all buckets within the window. + Reduce(func(Iterator) float64) float64 +} + +// RollingCounterOpts contains the arguments for creating RollingCounter. +type RollingCounterOpts struct { + Size int + BucketDuration time.Duration +} + +type rollingCounter struct { + policy *RollingPolicy +} + +// NewRollingCounter creates a new RollingCounter bases on RollingCounterOpts. +func NewRollingCounter(opts RollingCounterOpts) RollingCounter { + window := NewWindow(Options{Size: opts.Size}) + policy := NewRollingPolicy(window, RollingPolicyOpts{BucketDuration: opts.BucketDuration}) + return &rollingCounter{ + policy: policy, + } +} + +func (r *rollingCounter) Add(val int64) { + r.policy.Add(float64(val)) +} + +func (r *rollingCounter) Reduce(f func(Iterator) float64) float64 { + return r.policy.Reduce(f) +} + +func (r *rollingCounter) Avg() float64 { + return r.policy.Reduce(Avg) +} + +func (r *rollingCounter) Min() float64 { + return r.policy.Reduce(Min) +} + +func (r *rollingCounter) Max() float64 { + return r.policy.Reduce(Max) +} + +func (r *rollingCounter) Sum() float64 { + return r.policy.Reduce(Sum) +} + +func (r *rollingCounter) Value() int64 { + return int64(r.Sum()) +} + +func (r *rollingCounter) Timespan() int { + r.policy.mu.RLock() + defer r.policy.mu.RUnlock() + return r.policy.timespan() +} diff --git a/pkg/window/counter_test.go b/pkg/window/counter_test.go new file mode 100644 index 00000000000..ce604acc27e --- /dev/null +++ b/pkg/window/counter_test.go @@ -0,0 +1,182 @@ +// The MIT License (MIT) +// Copyright (c) 2022 go-kratos Project Authors. +// +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package window + +import ( + "math" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestRollingCounterAdd(t *testing.T) { + re := require.New(t) + size := 3 + bucketDuration := time.Second + opts := RollingCounterOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingCounter(opts) + listBuckets := func() [][]float64 { + buckets := make([][]float64, 0) + r.Reduce(func(i Iterator) float64 { + for i.Next() { + bucket := i.Bucket() + buckets = append(buckets, bucket.Points) + } + return 0.0 + }) + return buckets + } + re.Equal([][]float64{{}, {}, {}}, listBuckets()) + r.Add(1) + re.Equal([][]float64{{}, {}, {1}}, listBuckets()) + time.Sleep(time.Second) + r.Add(2) + r.Add(3) + re.Equal([][]float64{{}, {1}, {5}}, listBuckets()) + time.Sleep(time.Second) + r.Add(4) + r.Add(5) + r.Add(6) + re.Equal([][]float64{{1}, {5}, {15}}, listBuckets()) + time.Sleep(time.Second) + r.Add(7) + re.Equal([][]float64{{5}, {15}, {7}}, listBuckets()) + + // test the given reduce methods. + re.Less(math.Abs((r.Sum() - 27.)), 1e-7) + re.Less(math.Abs((r.Max() - 15.)), 1e-7) + re.Less(math.Abs((r.Min() - 5.)), 1e-7) + re.Less(math.Abs((r.Avg() - 9.)), 1e-7) +} + +func TestRollingCounterReduce(t *testing.T) { + re := require.New(t) + size := 3 + bucketDuration := time.Second + opts := RollingCounterOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingCounter(opts) + for x := 0; x < size; x++ { + for i := 0; i <= x; i++ { + r.Add(1) + } + if x < size-1 { + time.Sleep(bucketDuration) + } + } + var result = r.Reduce(func(iterator Iterator) float64 { + var result float64 + for iterator.Next() { + bucket := iterator.Bucket() + result += bucket.Points[0] + } + return result + }) + re.Less(math.Abs(result-6.), 1e-7) + re.Less(math.Abs((r.Sum() - 6.)), 1e-7) + re.Less(math.Abs(float64(r.Value())-6), 1e-7) +} + +func TestRollingCounterDataRace(t *testing.T) { + size := 3 + bucketDuration := time.Millisecond * 10 + opts := RollingCounterOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingCounter(opts) + var stop = make(chan bool) + go func() { + for { + select { + case <-stop: + return + default: + r.Add(1) + time.Sleep(time.Millisecond * 5) + } + } + }() + go func() { + for { + select { + case <-stop: + return + default: + _ = r.Reduce(func(i Iterator) float64 { + for i.Next() { + bucket := i.Bucket() + for range bucket.Points { + continue + } + } + return 0 + }) + } + } + }() + time.Sleep(time.Second * 3) + close(stop) +} + +func BenchmarkRollingCounterIncr(b *testing.B) { + size := 3 + bucketDuration := time.Millisecond * 100 + opts := RollingCounterOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingCounter(opts) + b.ResetTimer() + for i := 0; i <= b.N; i++ { + r.Add(1) + } +} + +func BenchmarkRollingCounterReduce(b *testing.B) { + size := 3 + bucketDuration := time.Second + opts := RollingCounterOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingCounter(opts) + for i := 0; i <= 10; i++ { + r.Add(1) + time.Sleep(time.Millisecond * 500) + } + b.ResetTimer() + for i := 0; i <= b.N; i++ { + var _ = r.Reduce(func(i Iterator) float64 { + var result float64 + for i.Next() { + bucket := i.Bucket() + if len(bucket.Points) != 0 { + result += bucket.Points[0] + } + } + return result + }) + } +} diff --git a/pkg/window/policy.go b/pkg/window/policy.go new file mode 100644 index 00000000000..d67a8aa6e59 --- /dev/null +++ b/pkg/window/policy.go @@ -0,0 +1,109 @@ +// The MIT License (MIT) +// Copyright (c) 2022 go-kratos Project Authors. +// +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package window + +import ( + "sync" + "time" +) + +// RollingPolicy is a policy for ring window based on time duration. +// RollingPolicy moves bucket offset with time duration. +// e.g. If the last point is appended one bucket duration ago, +// RollingPolicy will increment current offset. +type RollingPolicy struct { + mu sync.RWMutex + size int + window *Window + offset int + + bucketDuration time.Duration + lastAppendTime time.Time +} + +// RollingPolicyOpts contains the arguments for creating RollingPolicy. +type RollingPolicyOpts struct { + BucketDuration time.Duration +} + +// NewRollingPolicy creates a new RollingPolicy based on the given window and RollingPolicyOpts. +func NewRollingPolicy(window *Window, opts RollingPolicyOpts) *RollingPolicy { + return &RollingPolicy{ + window: window, + size: window.Size(), + offset: 0, + + bucketDuration: opts.BucketDuration, + lastAppendTime: time.Now(), + } +} + +// timespan returns passed bucket number since lastAppendTime, +// if it is one bucket duration earlier than the last recorded +// time, it will return the size. +func (r *RollingPolicy) timespan() int { + v := int(time.Since(r.lastAppendTime) / r.bucketDuration) + if v > -1 { // maybe time backwards + return v + } + return r.size +} + +// apply applies function f with value val on +// current offset bucket, expired bucket will be reset +func (r *RollingPolicy) apply(f func(offset int, val float64), val float64) { + r.mu.Lock() + defer r.mu.Unlock() + + // calculate current offset + timespan := r.timespan() + oriTimespan := timespan + if timespan > 0 { + start := (r.offset + 1) % r.size + end := (r.offset + timespan) % r.size + if timespan > r.size { + timespan = r.size + } + // reset the expired buckets + r.window.ResetBuckets(start, timespan) + r.offset = end + r.lastAppendTime = r.lastAppendTime.Add(time.Duration(oriTimespan * int(r.bucketDuration))) + } + f(r.offset, val) +} + +// Add adds the given value to the latest point within bucket. +func (r *RollingPolicy) Add(val float64) { + r.apply(r.window.Add, val) +} + +// Reduce applies the reduction function to all buckets within the window. +func (r *RollingPolicy) Reduce(f func(Iterator) float64) (val float64) { + r.mu.RLock() + defer r.mu.RUnlock() + + timespan := r.timespan() + if count := r.size - timespan; count > 0 { + offset := r.offset + timespan + 1 + if offset >= r.size { + offset -= r.size + } + val = f(r.window.Iterator(offset, count)) + } + return val +} diff --git a/pkg/window/policy_test.go b/pkg/window/policy_test.go new file mode 100644 index 00000000000..14b3b326192 --- /dev/null +++ b/pkg/window/policy_test.go @@ -0,0 +1,129 @@ +// The MIT License (MIT) +// Copyright (c) 2022 go-kratos Project Authors. +// +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package window + +import ( + "fmt" + "math" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func GetRollingPolicy() *RollingPolicy { + w := NewWindow(Options{Size: 3}) + return NewRollingPolicy(w, RollingPolicyOpts{BucketDuration: 100 * time.Millisecond}) +} + +func TestRollingPolicy_Add(t *testing.T) { + re := require.New(t) + // test func timespan return real span + tests := []struct { + timeSleep []int + offset []int + points []float64 + }{ + { + timeSleep: []int{150, 51}, + offset: []int{1, 2}, + points: []float64{1, 1}, + }, + { + timeSleep: []int{94, 250}, + offset: []int{0, 0}, + points: []float64{1, 1}, + }, + { + timeSleep: []int{150, 300, 600}, + offset: []int{1, 1, 1}, + points: []float64{1, 1, 1}, + }, + } + + for _, test := range tests { + t.Run("test policy add", func(t *testing.T) { + var totalTS, lastOffset int + timeSleep := test.timeSleep + policy := GetRollingPolicy() + for i, n := range timeSleep { + totalTS += n + time.Sleep(time.Duration(n) * time.Millisecond) + offset, point := test.offset[i], test.points[i] + policy.Add(point) + + re.Less(math.Abs(point-policy.window.buckets[offset].Points[0]), 1e-6, + fmt.Sprintf("error, time since last append: %vms, last offset: %v", totalTS, lastOffset)) + lastOffset = offset + } + }) + } +} + +func TestRollingPolicy_AddWithTimespan(t *testing.T) { + re := require.New(t) + t.Run("timespan < bucket number", func(t *testing.T) { + policy := GetRollingPolicy() + // bucket 0 + policy.Add(0) + // bucket 1 + time.Sleep(101 * time.Millisecond) + policy.Add(1) + re.Equal(1, int(policy.window.buckets[1].Points[0])) + // bucket 2 + time.Sleep(101 * time.Millisecond) + policy.Add(2) + // bucket 1 + time.Sleep(201 * time.Millisecond) + policy.Add(4) + + for _, bkt := range policy.window.buckets { + t.Logf("%+v", bkt) + } + + re.Equal(0, len(policy.window.buckets[0].Points)) + re.Equal(4, int(policy.window.buckets[1].Points[0])) + re.Equal(2, int(policy.window.buckets[2].Points[0])) + }) + + t.Run("timespan > bucket number", func(t *testing.T) { + policy := GetRollingPolicy() + + // bucket 0 + policy.Add(0) + // bucket 1 + time.Sleep(101 * time.Millisecond) + policy.Add(1) + policy.Add(1) + re.Equal(2, int(policy.window.buckets[1].Points[0])) + // bucket 2 + time.Sleep(101 * time.Millisecond) + policy.Add(2) + // bucket 1 + time.Sleep(501 * time.Millisecond) + policy.Add(4) + + for _, bkt := range policy.window.buckets { + t.Logf("%+v", bkt) + } + + re.Equal(0, len(policy.window.buckets[0].Points)) + re.Equal(4, int(policy.window.buckets[1].Points[0])) + re.Equal(0, len(policy.window.buckets[2].Points)) + }) +} diff --git a/pkg/window/reduce.go b/pkg/window/reduce.go new file mode 100644 index 00000000000..23fa87177f2 --- /dev/null +++ b/pkg/window/reduce.go @@ -0,0 +1,94 @@ +// The MIT License (MIT) +// Copyright (c) 2022 go-kratos Project Authors. +// +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package window + +// Sum the values within the window. +func Sum(iterator Iterator) float64 { + var result = 0.0 + for iterator.Next() { + bucket := iterator.Bucket() + for _, p := range bucket.Points { + result += p + } + } + return result +} + +// Avg the values within the window. +func Avg(iterator Iterator) float64 { + var result = 0.0 + var count = 0.0 + for iterator.Next() { + bucket := iterator.Bucket() + for _, p := range bucket.Points { + result += p + count++ + } + } + return result / count +} + +// Min the values within the window. +func Min(iterator Iterator) float64 { + var result = 0.0 + var started = false + for iterator.Next() { + bucket := iterator.Bucket() + for _, p := range bucket.Points { + if !started { + result = p + started = true + continue + } + if p < result { + result = p + } + } + } + return result +} + +// Max the values within the window. +func Max(iterator Iterator) float64 { + var result = 0.0 + var started = false + for iterator.Next() { + bucket := iterator.Bucket() + for _, p := range bucket.Points { + if !started { + result = p + started = true + continue + } + if p > result { + result = p + } + } + } + return result +} + +// Count sums the count value within the window. +func Count(iterator Iterator) float64 { + var result int64 + for iterator.Next() { + bucket := iterator.Bucket() + result += bucket.Count + } + return float64(result) +} diff --git a/pkg/window/window.go b/pkg/window/window.go new file mode 100644 index 00000000000..a5c4b0dfe3c --- /dev/null +++ b/pkg/window/window.go @@ -0,0 +1,150 @@ +// The MIT License (MIT) +// Copyright (c) 2022 go-kratos Project Authors. +// +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package window + +import "fmt" + +// Bucket contains multiple float64 points. +type Bucket struct { + Points []float64 + Count int64 + next *Bucket +} + +// Append appends the given value to the bucket. +func (b *Bucket) Append(val float64) { + b.Points = append(b.Points, val) + b.Count++ +} + +// Add adds the given value to the point. +func (b *Bucket) Add(offset int, val float64) { + b.Points[offset] += val + b.Count++ +} + +// Reset empties the bucket. +func (b *Bucket) Reset() { + b.Points = b.Points[:0] + b.Count = 0 +} + +// Next returns the next bucket. +func (b *Bucket) Next() *Bucket { + return b.next +} + +// Iterator iterates the buckets within the window. +type Iterator struct { + count int + iteratedCount int + cur *Bucket +} + +// Next returns true util all of the buckets has been iterated. +func (i *Iterator) Next() bool { + return i.count != i.iteratedCount +} + +// Bucket gets current bucket. +func (i *Iterator) Bucket() Bucket { + if !(i.Next()) { + panic(fmt.Errorf("stat/metric: iteration out of range iteratedCount: %d count: %d", i.iteratedCount, i.count)) + } + bucket := *i.cur + i.iteratedCount++ + i.cur = i.cur.Next() + return bucket +} + +// Window contains multiple buckets. +type Window struct { + buckets []Bucket + size int +} + +// Options contains the arguments for creating Window. +type Options struct { + Size int +} + +// NewWindow creates a new Window based on WindowOpts. +func NewWindow(opts Options) *Window { + buckets := make([]Bucket, opts.Size) + for offset := range buckets { + buckets[offset].Points = make([]float64, 0) + nextOffset := offset + 1 + if nextOffset == opts.Size { + nextOffset = 0 + } + buckets[offset].next = &buckets[nextOffset] + } + return &Window{buckets: buckets, size: opts.Size} +} + +// ResetWindow empties all buckets within the window. +func (w *Window) ResetWindow() { + for offset := range w.buckets { + w.ResetBucket(offset) + } +} + +// ResetBucket empties the bucket based on the given offset. +func (w *Window) ResetBucket(offset int) { + w.buckets[offset%w.size].Reset() +} + +// ResetBuckets empties the buckets based on the given offsets. +func (w *Window) ResetBuckets(offset int, count int) { + for i := 0; i < count; i++ { + w.ResetBucket(offset + i) + } +} + +// Append appends the given value to the bucket where index equals the given offset. +func (w *Window) Append(offset int, val float64) { + w.buckets[offset%w.size].Append(val) +} + +// Add adds the given value to the latest point within bucket where index equals the given offset. +func (w *Window) Add(offset int, val float64) { + offset %= w.size + if w.buckets[offset].Count == 0 { + w.buckets[offset].Append(val) + return + } + w.buckets[offset].Add(0, val) +} + +// Bucket returns the bucket where index equals the given offset. +func (w *Window) Bucket(offset int) Bucket { + return w.buckets[offset%w.size] +} + +// Size returns the size of the window. +func (w *Window) Size() int { + return w.size +} + +// Iterator returns the count number buckets iterator from offset. +func (w *Window) Iterator(offset int, count int) Iterator { + return Iterator{ + count: count, + cur: &w.buckets[offset%w.size], + } +} diff --git a/pkg/window/window_test.go b/pkg/window/window_test.go new file mode 100644 index 00000000000..0205aae47a3 --- /dev/null +++ b/pkg/window/window_test.go @@ -0,0 +1,101 @@ +// The MIT License (MIT) +// Copyright (c) 2022 go-kratos Project Authors. +// +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package window + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWindowResetWindow(t *testing.T) { + re := require.New(t) + opts := Options{Size: 3} + window := NewWindow(opts) + for i := 0; i < opts.Size; i++ { + window.Append(i, 1.0) + } + window.ResetWindow() + for i := 0; i < opts.Size; i++ { + re.Equal(len(window.Bucket(i).Points), 0) + } +} + +func TestWindowResetBucket(t *testing.T) { + re := require.New(t) + opts := Options{Size: 3} + window := NewWindow(opts) + for i := 0; i < opts.Size; i++ { + window.Append(i, 1.0) + } + window.ResetBucket(1) + re.Equal(len(window.Bucket(1).Points), 0) + re.Equal(window.Bucket(0).Points[0], float64(1.0)) + re.Equal(window.Bucket(2).Points[0], float64(1.0)) +} + +func TestWindowResetBuckets(t *testing.T) { + re := require.New(t) + opts := Options{Size: 3} + window := NewWindow(opts) + for i := 0; i < opts.Size; i++ { + window.Append(i, 1.0) + } + window.ResetBuckets(0, 3) + for i := 0; i < opts.Size; i++ { + re.Equal(len(window.Bucket(i).Points), 0) + } +} + +func TestWindowAppend(t *testing.T) { + re := require.New(t) + opts := Options{Size: 3} + window := NewWindow(opts) + for i := 0; i < opts.Size; i++ { + window.Append(i, 1.0) + } + for i := 1; i < opts.Size; i++ { + window.Append(i, 2.0) + } + for i := 0; i < opts.Size; i++ { + re.Equal(window.Bucket(i).Points[0], float64(1.0)) + } + for i := 1; i < opts.Size; i++ { + re.Equal(window.Bucket(i).Points[1], float64(2.0)) + } +} + +func TestWindowAdd(t *testing.T) { + opts := Options{Size: 3} + window := NewWindow(opts) + window.Append(0, 1.0) + window.Add(0, 1.0) + assert.Equal(t, window.Bucket(0).Points[0], float64(2.0)) + + window = NewWindow(opts) + window.Add(0, 1.0) + window.Add(0, 1.0) + assert.Equal(t, window.Bucket(0).Points[0], float64(2.0)) +} + +func TestWindowSize(t *testing.T) { + opts := Options{Size: 3} + window := NewWindow(opts) + assert.Equal(t, window.Size(), 3) +} diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 7a0e99efa5f..25a47a7fca9 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1236,6 +1236,11 @@ func (c *RaftCluster) GetRegions() []*core.RegionInfo { return c.core.GetRegions() } +// ValidRegion is used to decide if the region is valid. +func (c *RaftCluster) ValidRegion(region *metapb.Region) error { + return c.core.ValidRegion(region) +} + // GetTotalRegionCount returns total count of regions func (c *RaftCluster) GetTotalRegionCount() int { return c.core.GetTotalRegionCount() diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index c1da97363b5..a38ae86123f 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -50,7 +50,7 @@ func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSp return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs() } reqRegion := request.GetRegion() - err := c.ValidRequestRegion(reqRegion) + err := c.ValidRegion(reqRegion) if err != nil { return nil, err } @@ -90,23 +90,6 @@ func (c *RaftCluster) isSchedulingHalted() bool { return c.opt.IsSchedulingHalted() } -// ValidRequestRegion is used to decide if the region is valid. -func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error { - startKey := reqRegion.GetStartKey() - region := c.GetRegionByKey(startKey) - if region == nil { - return errors.Errorf("region not found, request region: %v", logutil.RedactStringer(core.RegionToHexMeta(reqRegion))) - } - // If the request epoch is less than current region epoch, then returns an error. - reqRegionEpoch := reqRegion.GetRegionEpoch() - regionEpoch := region.GetMeta().GetRegionEpoch() - if reqRegionEpoch.GetVersion() < regionEpoch.GetVersion() || - reqRegionEpoch.GetConfVer() < regionEpoch.GetConfVer() { - return errors.Errorf("invalid region epoch, request: %v, current: %v", reqRegionEpoch, regionEpoch) - } - return nil -} - // HandleAskBatchSplit handles the batch split request. func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) { if c.isSchedulingHalted() { @@ -117,7 +100,7 @@ func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (* } reqRegion := request.GetRegion() splitCount := request.GetSplitCount() - err := c.ValidRequestRegion(reqRegion) + err := c.ValidRegion(reqRegion) if err != nil { return nil, err } diff --git a/server/grpc_service.go b/server/grpc_service.go index 4fbfaa0f407..da9f71170c3 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -69,6 +69,7 @@ var ( ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout") ErrNotFoundTSOAddr = status.Errorf(codes.NotFound, "not found tso address") + ErrNotFoundSchedulingAddr = status.Errorf(codes.NotFound, "not found scheduling address") ErrForwardTSOTimeout = status.Errorf(codes.DeadlineExceeded, "forward tso request timeout") ErrMaxCountTSOProxyRoutinesExceeded = status.Errorf(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded") ErrTSOProxyRecvFromClientTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy timeout when receiving from client; stream closed by server") @@ -1002,8 +1003,8 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear s.handleDamagedStore(request.GetStats()) storeHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds()) if s.IsAPIServiceMode() { - s.updateSchedulingClient(ctx) - if s.schedulingClient.Load() != nil { + forwardCli, _ := s.updateSchedulingClient(ctx) + if forwardCli != nil { req := &schedulingpb.StoreHeartbeatRequest{ Header: &schedulingpb.RequestHeader{ ClusterId: request.GetHeader().GetClusterId(), @@ -1011,7 +1012,7 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear }, Stats: request.GetStats(), } - if _, err := s.schedulingClient.Load().(*schedulingClient).getClient().StoreHeartbeat(ctx, req); err != nil { + if _, err := forwardCli.StoreHeartbeat(ctx, req); err != nil { // reset to let it be updated in the next request s.schedulingClient.Store(&schedulingClient{}) } @@ -1030,7 +1031,7 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear return resp, nil } -func (s *GrpcServer) updateSchedulingClient(ctx context.Context) { +func (s *GrpcServer) updateSchedulingClient(ctx context.Context) (schedulingpb.SchedulingClient, error) { forwardedHost, _ := s.GetServicePrimaryAddr(ctx, utils.SchedulingServiceName) pre := s.schedulingClient.Load() if forwardedHost != "" && ((pre == nil) || (pre != nil && forwardedHost != pre.(*schedulingClient).getPrimaryAddr())) { @@ -1038,11 +1039,14 @@ func (s *GrpcServer) updateSchedulingClient(ctx context.Context) { if err != nil { log.Error("get delegate client failed", zap.Error(err)) } - s.schedulingClient.Store(&schedulingClient{ + forwardCli := &schedulingClient{ client: schedulingpb.NewSchedulingClient(client), lastPrimary: forwardedHost, - }) + } + s.schedulingClient.Store(forwardCli) + return forwardCli.getClient(), nil } + return nil, ErrNotFoundSchedulingAddr } // bucketHeartbeatServer wraps PD_ReportBucketsServer to ensure when any error @@ -1622,6 +1626,26 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest // AskBatchSplit implements gRPC PDServer. func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) { + if s.IsAPIServiceMode() { + s.updateSchedulingClient(ctx) + if s.schedulingClient.Load() != nil { + req := &schedulingpb.AskBatchSplitRequest{ + Header: &schedulingpb.RequestHeader{ + ClusterId: request.GetHeader().GetClusterId(), + SenderId: request.GetHeader().GetSenderId(), + }, + Region: request.GetRegion(), + SplitCount: request.GetSplitCount(), + } + resp, err := s.schedulingClient.Load().(*schedulingClient).getClient().AskBatchSplit(ctx, req) + if err != nil { + // reset to let it be updated in the next request + s.schedulingClient.Store(&schedulingClient{}) + return s.convertAskSplitResponse(resp), err + } + return s.convertAskSplitResponse(resp), nil + } + } fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).AskBatchSplit(ctx, request) } @@ -1771,8 +1795,13 @@ func (s *GrpcServer) PutClusterConfig(ctx context.Context, request *pdpb.PutClus // ScatterRegion implements gRPC PDServer. func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterRegionRequest) (*pdpb.ScatterRegionResponse, error) { if s.IsAPIServiceMode() { - s.updateSchedulingClient(ctx) - if s.schedulingClient.Load() != nil { + forwardCli, err := s.updateSchedulingClient(ctx) + if err != nil { + return &pdpb.ScatterRegionResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + if forwardCli != nil { regionsID := request.GetRegionsId() if len(regionsID) == 0 { return &pdpb.ScatterRegionResponse{ @@ -1789,7 +1818,7 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg RetryLimit: request.GetRetryLimit(), SkipStoreLimit: request.GetSkipStoreLimit(), } - resp, err := s.schedulingClient.Load().(*schedulingClient).getClient().ScatterRegions(ctx, req) + resp, err := forwardCli.ScatterRegions(ctx, req) if err != nil { // reset to let it be updated in the next request s.schedulingClient.Store(&schedulingClient{}) @@ -1990,8 +2019,13 @@ func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb // GetOperator gets information about the operator belonging to the specify region. func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorRequest) (*pdpb.GetOperatorResponse, error) { if s.IsAPIServiceMode() { - s.updateSchedulingClient(ctx) - if s.schedulingClient.Load() != nil { + forwardCli, err := s.updateSchedulingClient(ctx) + if err != nil { + return &pdpb.GetOperatorResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + if forwardCli != nil { req := &schedulingpb.GetOperatorRequest{ Header: &schedulingpb.RequestHeader{ ClusterId: request.GetHeader().GetClusterId(), @@ -1999,7 +2033,7 @@ func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorR }, RegionId: request.GetRegionId(), } - resp, err := s.schedulingClient.Load().(*schedulingClient).getClient().GetOperator(ctx, req) + resp, err := forwardCli.GetOperator(ctx, req) if err != nil { // reset to let it be updated in the next request s.schedulingClient.Store(&schedulingClient{}) @@ -2136,6 +2170,13 @@ func (s *GrpcServer) convertOperatorResponse(resp *schedulingpb.GetOperatorRespo } } +func (s *GrpcServer) convertAskSplitResponse(resp *schedulingpb.AskBatchSplitResponse) *pdpb.AskBatchSplitResponse { + return &pdpb.AskBatchSplitResponse{ + Header: s.convertHeader(resp.GetHeader()), + Ids: resp.GetIds(), + } +} + // Only used for the TestLocalAllocatorLeaderChange. var mockLocalAllocatorLeaderChangeFlag = false @@ -2241,8 +2282,13 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest // SplitRegions split regions by the given split keys func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegionsRequest) (*pdpb.SplitRegionsResponse, error) { if s.IsAPIServiceMode() { - s.updateSchedulingClient(ctx) - if s.schedulingClient.Load() != nil { + forwardCli, err := s.updateSchedulingClient(ctx) + if err != nil { + return &pdpb.SplitRegionsResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + if forwardCli != nil { req := &schedulingpb.SplitRegionsRequest{ Header: &schedulingpb.RequestHeader{ ClusterId: request.GetHeader().GetClusterId(), @@ -2251,7 +2297,7 @@ func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegion SplitKeys: request.GetSplitKeys(), RetryLimit: request.GetRetryLimit(), } - resp, err := s.schedulingClient.Load().(*schedulingClient).getClient().SplitRegions(ctx, req) + resp, err := forwardCli.SplitRegions(ctx, req) if err != nil { // reset to let it be updated in the next request s.schedulingClient.Store(&schedulingClient{}) diff --git a/tests/integrations/client/go.mod b/tests/integrations/client/go.mod index ea933ae0996..e38efbeb438 100644 --- a/tests/integrations/client/go.mod +++ b/tests/integrations/client/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/docker/go-units v0.4.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a + github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/client/go.sum b/tests/integrations/client/go.sum index 785d69bbc33..c745c4fa518 100644 --- a/tests/integrations/client/go.sum +++ b/tests/integrations/client/go.sum @@ -402,8 +402,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c h1:5cpE29kMHjc8fv+mRiXbLTDfoZHiX5BTK6knVWlWvqk= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= diff --git a/tests/integrations/mcs/go.mod b/tests/integrations/mcs/go.mod index 536f34edbcf..000bfdc8312 100644 --- a/tests/integrations/mcs/go.mod +++ b/tests/integrations/mcs/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/docker/go-units v0.4.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a + github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/mcs/go.sum b/tests/integrations/mcs/go.sum index ad36f60c3ce..0da75329284 100644 --- a/tests/integrations/mcs/go.sum +++ b/tests/integrations/mcs/go.sum @@ -407,8 +407,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c h1:5cpE29kMHjc8fv+mRiXbLTDfoZHiX5BTK6knVWlWvqk= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 324c8e5cad5..85cf84361b4 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -265,18 +265,32 @@ func (suite *serverTestSuite) TestSchedulerSync() { api.MustDeleteScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName) checkEvictLeaderSchedulerExist(re, schedulersController, false) - // TODO: test more schedulers. - // Fixme: the following code will fail because the scheduler is not removed but not synced. - // checkDelete := func(schedulerName string) { - // re.NotNil(schedulersController.GetScheduler(schedulers.BalanceLeaderName) != nil) - // api.MustDeleteScheduler(re, suite.backendEndpoints, schedulers.BalanceLeaderName) - // testutil.Eventually(re, func() bool { - // return schedulersController.GetScheduler(schedulers.BalanceLeaderName) == nil - // }) - // } - // checkDelete(schedulers.BalanceLeaderName) - // checkDelete(schedulers.BalanceRegionName) - // checkDelete(schedulers.HotRegionName) + // The default scheduler could not be deleted, it could only be disabled. + defaultSchedulerNames := []string{ + schedulers.BalanceLeaderName, + schedulers.BalanceRegionName, + schedulers.BalanceWitnessName, + schedulers.HotRegionName, + schedulers.TransferWitnessLeaderName, + } + checkDisabled := func(name string, shouldDisabled bool) { + re.NotNil(schedulersController.GetScheduler(name), name) + testutil.Eventually(re, func() bool { + disabled, err := schedulersController.IsSchedulerDisabled(name) + re.NoError(err, name) + return disabled == shouldDisabled + }) + } + for _, name := range defaultSchedulerNames { + checkDisabled(name, false) + api.MustDeleteScheduler(re, suite.backendEndpoints, name) + checkDisabled(name, true) + } + for _, name := range defaultSchedulerNames { + checkDisabled(name, true) + api.MustAddScheduler(re, suite.backendEndpoints, name, nil) + checkDisabled(name, false) + } } func checkEvictLeaderSchedulerExist(re *require.Assertions, sc *schedulers.Controller, exist bool) { diff --git a/tests/integrations/tso/go.mod b/tests/integrations/tso/go.mod index f8a742362c9..f8a5cfac75f 100644 --- a/tests/integrations/tso/go.mod +++ b/tests/integrations/tso/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c - github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a + github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c github.com/stretchr/testify v1.8.4 github.com/tikv/pd v0.0.0-00010101000000-000000000000 github.com/tikv/pd/client v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/tso/go.sum b/tests/integrations/tso/go.sum index 7ae1d41a823..63327985f0d 100644 --- a/tests/integrations/tso/go.sum +++ b/tests/integrations/tso/go.sum @@ -401,8 +401,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ= github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c h1:5cpE29kMHjc8fv+mRiXbLTDfoZHiX5BTK6knVWlWvqk= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index c5b118a9f5e..b3d9f356ad1 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -407,20 +407,23 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-leader-scheduler"}, nil) re.Contains(echo, "Success!") - // test evict-slow-trend scheduler config - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-slow-trend-scheduler"}, nil) - re.Contains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) - re.Contains(echo, "evict-slow-trend-scheduler") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-slow-trend-scheduler", "set", "recovery-duration", "100"}, nil) - re.Contains(echo, "Success!") - conf = make(map[string]interface{}) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-slow-trend-scheduler", "show"}, &conf) - re.Equal(100., conf["recovery-duration"]) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-slow-trend-scheduler"}, nil) - re.Contains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) - re.NotContains(echo, "evict-slow-trend-scheduler") + // test evict-slow-store && evict-slow-trend schedulers config + evictSlownessSchedulers := []string{"evict-slow-store-scheduler", "evict-slow-trend-scheduler"} + for _, schedulerName := range evictSlownessSchedulers { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", schedulerName}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + re.Contains(echo, schedulerName) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "set", "recovery-duration", "100"}, nil) + re.Contains(echo, "Success!") + conf = make(map[string]interface{}) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "show"}, &conf) + re.Equal(100., conf["recovery-duration"]) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", schedulerName}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + re.NotContains(echo, schedulerName) + } // test show scheduler with paused and disabled status. checkSchedulerWithStatusCommand := func(status string, expected []string) { diff --git a/tests/server/cluster/cluster_work_test.go b/tests/server/cluster/cluster_work_test.go index ef09e522305..eabecf8e29b 100644 --- a/tests/server/cluster/cluster_work_test.go +++ b/tests/server/cluster/cluster_work_test.go @@ -64,13 +64,13 @@ func TestValidRequestRegion(t *testing.T) { err = rc.HandleRegionHeartbeat(r1) re.NoError(err) r2 := &metapb.Region{Id: 2, StartKey: []byte("a"), EndKey: []byte("b")} - re.Error(rc.ValidRequestRegion(r2)) + re.Error(rc.ValidRegion(r2)) r3 := &metapb.Region{Id: 1, StartKey: []byte(""), EndKey: []byte("a"), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}} - re.Error(rc.ValidRequestRegion(r3)) + re.Error(rc.ValidRegion(r3)) r4 := &metapb.Region{Id: 1, StartKey: []byte(""), EndKey: []byte("a"), RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 1}} - re.Error(rc.ValidRequestRegion(r4)) + re.Error(rc.ValidRegion(r4)) r5 := &metapb.Region{Id: 1, StartKey: []byte(""), EndKey: []byte("a"), RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 2}} - re.NoError(rc.ValidRequestRegion(r5)) + re.NoError(rc.ValidRegion(r5)) rc.Stop() } diff --git a/tools/pd-api-bench/go.mod b/tools/pd-api-bench/go.mod index 94aa1ff0dc9..e6e896a0797 100644 --- a/tools/pd-api-bench/go.mod +++ b/tools/pd-api-bench/go.mod @@ -69,7 +69,7 @@ require ( github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 // indirect - github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a // indirect + github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c // indirect github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.11.1 // indirect diff --git a/tools/pd-api-bench/go.sum b/tools/pd-api-bench/go.sum index aaa400df5fd..f40a4fe2f5a 100644 --- a/tools/pd-api-bench/go.sum +++ b/tools/pd-api-bench/go.sum @@ -238,8 +238,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c h1:5cpE29kMHjc8fv+mRiXbLTDfoZHiX5BTK6knVWlWvqk= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index 40bf9a07a4d..4349735f06d 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -499,6 +499,7 @@ func NewConfigSchedulerCommand() *cobra.Command { newConfigGrantHotRegionCommand(), newConfigBalanceLeaderCommand(), newSplitBucketCommand(), + newConfigEvictSlowStoreCommand(), newConfigEvictSlowTrendCommand(), ) return c @@ -776,6 +777,25 @@ func setShuffleRegionSchedulerRolesCommandFunc(cmd *cobra.Command, args []string cmd.Println("Success!") } +func newConfigEvictSlowStoreCommand() *cobra.Command { + c := &cobra.Command{ + Use: "evict-slow-store-scheduler", + Short: "evict-slow-store-scheduler config", + Run: listSchedulerConfigCommandFunc, + } + + c.AddCommand(&cobra.Command{ + Use: "show", + Short: "list the config item", + Run: listSchedulerConfigCommandFunc, + }, &cobra.Command{ + Use: "set ", + Short: "set the config item", + Run: func(cmd *cobra.Command, args []string) { postSchedulerConfigCommandFunc(cmd, c.Name(), args) }, + }) + return c +} + func newConfigEvictSlowTrendCommand() *cobra.Command { c := &cobra.Command{ Use: "evict-slow-trend-scheduler",