Skip to content

Commit

Permalink
Merge branch 'master' into rate_limit/bbr_v1
Browse files Browse the repository at this point in the history
  • Loading branch information
CabinfeverB committed Oct 23, 2023
2 parents c9c186d + 3f1a688 commit 341c607
Show file tree
Hide file tree
Showing 34 changed files with 754 additions and 140 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a
github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA=
github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c h1:5cpE29kMHjc8fv+mRiXbLTDfoZHiX5BTK6knVWlWvqk=
github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
105 changes: 105 additions & 0 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -1139,6 +1139,111 @@
"timeFrom": null,
"timeShift": null
},
{
"bars": false,
"cacheTimeout": null,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The current peer count of the cluster",
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 6,
"w": 4,
"x": 16,
"y": 13
},
"hiddenSeries": false,
"id": 22,
"interval": null,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"max": false,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"links": [],
"maxDataPoints": 100,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.10",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum(pd_rule_manager_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}) by (type)",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
"legendFormat": "{{type}}",
"refId": "A",
"step": 4
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Placement Rules Status",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"$$hashKey": "object:192",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"$$hashKey": "object:193",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"collapsed": true,
"gridPos": {
Expand Down
17 changes: 17 additions & 0 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1675,6 +1675,23 @@ func (r *RegionsInfo) GetAverageRegionSize() int64 {
return r.tree.TotalSize() / int64(r.tree.length())
}

// ValidRegion is used to decide if the region is valid.
func (r *RegionsInfo) ValidRegion(region *metapb.Region) error {
startKey := region.GetStartKey()
currnetRegion := r.GetRegionByKey(startKey)
if currnetRegion == nil {
return errors.Errorf("region not found, request region: %v", logutil.RedactStringer(RegionToHexMeta(region)))
}
// If the request epoch is less than current region epoch, then returns an error.
regionEpoch := region.GetRegionEpoch()
currnetEpoch := currnetRegion.GetMeta().GetRegionEpoch()
if regionEpoch.GetVersion() < currnetEpoch.GetVersion() ||
regionEpoch.GetConfVer() < currnetEpoch.GetConfVer() {
return errors.Errorf("invalid region epoch, request: %v, current: %v", regionEpoch, currnetEpoch)
}
return nil
}

// DiffRegionPeersInfo return the difference of peers info between two RegionInfo
func DiffRegionPeersInfo(origin *RegionInfo, other *RegionInfo) string {
var ret []string
Expand Down
23 changes: 18 additions & 5 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"os"
"path/filepath"
"reflect"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -232,6 +233,14 @@ func (o *PersistConfig) getSchedulersUpdatingNotifier() chan<- struct{} {
return v.(chan<- struct{})
}

func (o *PersistConfig) tryNotifySchedulersUpdating() {
notifier := o.getSchedulersUpdatingNotifier()
if notifier == nil {
return
}
notifier <- struct{}{}
}

// GetClusterVersion returns the cluster version.
func (o *PersistConfig) GetClusterVersion() *semver.Version {
return (*semver.Version)(atomic.LoadPointer(&o.clusterVersion))
Expand All @@ -251,11 +260,10 @@ func (o *PersistConfig) GetScheduleConfig() *sc.ScheduleConfig {
func (o *PersistConfig) SetScheduleConfig(cfg *sc.ScheduleConfig) {
old := o.GetScheduleConfig()
o.schedule.Store(cfg)
// The coordinator is not aware of the underlying scheduler config changes, however, it
// should react on the scheduler number changes to handle the add/remove scheduler events.
if notifier := o.getSchedulersUpdatingNotifier(); notifier != nil &&
len(old.Schedulers) != len(cfg.Schedulers) {
notifier <- struct{}{}
// The coordinator is not aware of the underlying scheduler config changes,
// we should notify it to update the schedulers proactively.
if !reflect.DeepEqual(old.Schedulers, cfg.Schedulers) {
o.tryNotifySchedulersUpdating()
}
}

Expand Down Expand Up @@ -650,6 +658,11 @@ func (o *PersistConfig) IsRaftKV2() bool {
return o.GetStoreConfig().IsRaftKV2()
}

// IsTikvRegionSplitEnabled returns whether tikv split region is disabled.
func (o *PersistConfig) IsTikvRegionSplitEnabled() bool {
return o.GetScheduleConfig().EnableTiKVSplitRegion
}

// TODO: implement the following methods

// AddSchedulerCfg adds the scheduler configurations.
Expand Down
71 changes: 71 additions & 0 deletions pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/schedulingpb"
"github.com/pingcap/log"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/versioninfo"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -254,6 +257,74 @@ func (s *Service) GetOperator(ctx context.Context, request *schedulingpb.GetOper
}, nil
}

// AskBatchSplit implements gRPC PDServer.
func (s *Service) AskBatchSplit(ctx context.Context, request *schedulingpb.AskBatchSplitRequest) (*schedulingpb.AskBatchSplitResponse, error) {
c := s.GetCluster()
if c == nil {
return &schedulingpb.AskBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil
}

if request.GetRegion() == nil {
return &schedulingpb.AskBatchSplitResponse{
Header: s.wrapErrorToHeader(schedulingpb.ErrorType_UNKNOWN,
"missing region for split"),
}, nil
}

if c.persistConfig.IsSchedulingHalted() {
return nil, errs.ErrSchedulingIsHalted.FastGenByArgs()
}
if !c.persistConfig.IsTikvRegionSplitEnabled() {
return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs()
}
reqRegion := request.GetRegion()
splitCount := request.GetSplitCount()
err := c.ValidRegion(reqRegion)
if err != nil {
return nil, err
}
splitIDs := make([]*pdpb.SplitID, 0, splitCount)
recordRegions := make([]uint64, 0, splitCount+1)

for i := 0; i < int(splitCount); i++ {
newRegionID, err := c.AllocID()
if err != nil {
return nil, errs.ErrSchedulerNotFound.FastGenByArgs()
}

peerIDs := make([]uint64, len(request.Region.Peers))
for i := 0; i < len(peerIDs); i++ {
if peerIDs[i], err = c.AllocID(); err != nil {
return nil, err
}
}

recordRegions = append(recordRegions, newRegionID)
splitIDs = append(splitIDs, &pdpb.SplitID{
NewRegionId: newRegionID,
NewPeerIds: peerIDs,
})

log.Info("alloc ids for region split", zap.Uint64("region-id", newRegionID), zap.Uint64s("peer-ids", peerIDs))
}

recordRegions = append(recordRegions, reqRegion.GetId())
if versioninfo.IsFeatureSupported(c.persistConfig.GetClusterVersion(), versioninfo.RegionMerge) {
// Disable merge the regions in a period of time.
c.GetCoordinator().GetMergeChecker().RecordRegionSplit(recordRegions)
}

// If region splits during the scheduling process, regions with abnormal
// status may be left, and these regions need to be checked with higher
// priority.
c.GetCoordinator().GetCheckerController().AddSuspectRegions(recordRegions...)

return &schedulingpb.AskBatchSplitResponse{
Header: s.header(),
Ids: splitIDs,
}, nil
}

// RegisterGRPCService registers the service to gRPC server.
func (s *Service) RegisterGRPCService(g *grpc.Server) {
schedulingpb.RegisterSchedulingServer(g, s)
Expand Down
Loading

0 comments on commit 341c607

Please sign in to comment.