Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin' into allow-follower-getregions
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch committed Aug 8, 2023
2 parents dc60bb6 + ebd2cfe commit 5cbccf4
Show file tree
Hide file tree
Showing 162 changed files with 5,891 additions and 3,161 deletions.
9 changes: 7 additions & 2 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Require review from domain experts when the PR modified significant config files.
# Require review from domain experts when the PR modified significant config files

/conf/config.toml @tikv/pd-configuration-reviewer
/server/config/config.go @tikv/pd-configuration-reviewer
/pkg/schedule/config/config.go @tikv/pd-configuration-reviewer
/pkg/schedule/schedulers/hot_region_config.go @tikv/pd-configuration-reviewer
/conf/config.toml @tikv/pd-configuration-reviewer
/pkg/mcs/resourcemanager/server/config.go @tikv/pd-configuration-reviewer
/pkg/mcs/scheduling/server/config/config.go @tikv/pd-configuration-reviewer
/pkg/mcs/tso/server/config.go @tikv/pd-configuration-reviewer
/metrics/grafana/pd.json @tikv/pd-configuration-reviewer
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ install-tools:

#### Static checks ####

check: install-tools tidy static generate-errdoc check-plugin check-test
check: install-tools tidy static generate-errdoc check-test

static: install-tools
@ echo "gofmt ..."
Expand Down
36 changes: 30 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math/rand"
"runtime/trace"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -174,8 +175,9 @@ func WithExcludeTombstone() GetStoreOption {

// RegionsOp represents available options when operate regions
type RegionsOp struct {
group string
retryLimit uint64
group string
retryLimit uint64
skipStoreLimit bool
}

// RegionsOption configures RegionsOp
Expand All @@ -191,6 +193,11 @@ func WithRetry(retry uint64) RegionsOption {
return func(op *RegionsOp) { op.retryLimit = retry }
}

// WithSkipStoreLimit specify if skip the store limit check during Scatter/Split Regions
func WithSkipStoreLimit() RegionsOption {
return func(op *RegionsOp) { op.skipStoreLimit = true }
}

// GetRegionOp represents available options when getting regions.
type GetRegionOp struct {
needBuckets bool
Expand Down Expand Up @@ -817,6 +824,7 @@ func (c *client) GetTSAsync(ctx context.Context) TSFuture {
}

func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFuture {
defer trace.StartRegion(ctx, "GetLocalTSAsync").End()
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("GetLocalTSAsync", opentracing.ChildOf(span.Context()))
ctx = opentracing.ContextWithSpan(ctx, span)
Expand Down Expand Up @@ -1393,10 +1401,11 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint
}
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &pdpb.ScatterRegionRequest{
Header: c.requestHeader(),
Group: options.group,
RegionsId: regionsID,
RetryLimit: options.retryLimit,
Header: c.requestHeader(),
Group: options.group,
RegionsId: regionsID,
RetryLimit: options.retryLimit,
SkipStoreLimit: options.skipStoreLimit,
}

ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
Expand Down Expand Up @@ -1448,6 +1457,9 @@ func trimHTTPPrefix(str string) string {
}

func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error) {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
if protoClient == nil {
return nil, 0, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1477,6 +1489,9 @@ func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items
for i, it := range items {
resArr[i] = &pdpb.GlobalConfigItem{Name: it.Name, Value: it.Value, Kind: it.EventType, Payload: it.PayLoad}
}
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
if protoClient == nil {
return errs.ErrClientGetProtoClient
Expand All @@ -1492,6 +1507,9 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis
// TODO: Add retry mechanism
// register watch components there
globalConfigWatcherCh := make(chan []GlobalConfigItem, 16)
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1538,6 +1556,9 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis
}

func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
if protoClient == nil {
return 0, errs.ErrClientGetProtoClient
Expand All @@ -1556,6 +1577,9 @@ func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) {
}

func (c *client) SetExternalTimestamp(ctx context.Context, timestamp uint64) error {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
if protoClient == nil {
return errs.ErrClientGetProtoClient
Expand Down
3 changes: 3 additions & 0 deletions client/gc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ func (c *client) WatchGCSafePointV2(ctx context.Context, revision int64) (chan [
Revision: revision,
}

ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.2
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333 h1:A6Wqgq0uMw51UiRAH27TVN0QlzVR5CVtV6fTQSAmvKM=
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30 h1:EvqKcDT7ceGLW0mXqM8Cp5Z8DfgQRnwj2YTnlCLj2QI=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
8 changes: 8 additions & 0 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math/rand"
"runtime/trace"
"sync"
"time"

Expand Down Expand Up @@ -79,6 +80,8 @@ func (c *tsoClient) dispatchRequest(dcLocation string, request *tsoRequest) erro
c.svcDiscovery.ScheduleCheckMemberChanged()
return err
}

defer trace.StartRegion(request.requestCtx, "tsoReqEnqueue").End()
dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request
return nil
}
Expand All @@ -96,6 +99,7 @@ func (req *tsoRequest) Wait() (physical int64, logical int64, err error) {
cmdDurationTSOAsyncWait.Observe(start.Sub(req.start).Seconds())
select {
case err = <-req.done:
defer trace.StartRegion(req.requestCtx, "tsoReqDone").End()
err = errors.WithStack(err)
defer tsoReqPool.Put(req)
if err != nil {
Expand Down Expand Up @@ -741,6 +745,9 @@ func (c *tsoClient) processRequests(
}

requests := tbc.getCollectedRequests()
for _, req := range requests {
defer trace.StartRegion(req.requestCtx, "tsoReqSend").End()
}
count := int64(len(requests))
reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID()
respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests(
Expand Down Expand Up @@ -830,6 +837,7 @@ func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical
span.Finish()
}
requests[i].physical, requests[i].logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits)
defer trace.StartRegion(requests[i].requestCtx, "tsoReqDequeue").End()
requests[i].done <- err
}
}
22 changes: 22 additions & 0 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/tikv/pd/pkg/dashboard"
"github.com/tikv/pd/pkg/errs"
resource_manager "github.com/tikv/pd/pkg/mcs/resourcemanager/server"
scheduling "github.com/tikv/pd/pkg/mcs/scheduling/server"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/swaggerserver"
Expand Down Expand Up @@ -67,6 +68,7 @@ func NewServiceCommand() *cobra.Command {
}
cmd.AddCommand(NewTSOServiceCommand())
cmd.AddCommand(NewResourceManagerServiceCommand())
cmd.AddCommand(NewSchedulingServiceCommand())
cmd.AddCommand(NewAPIServiceCommand())
return cmd
}
Expand All @@ -91,6 +93,26 @@ func NewTSOServiceCommand() *cobra.Command {
return cmd
}

// NewSchedulingServiceCommand returns the scheduling service command.
func NewSchedulingServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "scheduling",
Short: "Run the scheduling service",
Run: scheduling.CreateServerWrapper,
}
cmd.Flags().BoolP("version", "V", false, "print version information and exit")
cmd.Flags().StringP("config", "", "", "config file")
cmd.Flags().StringP("backend-endpoints", "", "", "url for etcd client")
cmd.Flags().StringP("listen-addr", "", "", "listen address for tso service")
cmd.Flags().StringP("advertise-listen-addr", "", "", "advertise urls for listen address (default '${listen-addr}')")
cmd.Flags().StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
cmd.Flags().StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
cmd.Flags().StringP("key", "", "", "path of file that contains X509 key in PEM format")
cmd.Flags().StringP("log-level", "L", "info", "log level: debug, info, warn, error, fatal (default 'info')")
cmd.Flags().StringP("log-file", "", "", "log file path")
return cmd
}

// NewResourceManagerServiceCommand returns the resource manager service command.
func NewResourceManagerServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Expand Down
9 changes: 6 additions & 3 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ flag_management:
carryforward: true
statuses:
- type: project
target: 85%
target: 74% # increase it if you want to enforce higher coverage for project, current setting as 74% is for do not let the error be reported and lose the meaning of warning.
- type: patch
target: 85%
target: 74% # increase it if you want to enforce higher coverage for project, current setting as 74% is for do not let the error be reported and lose the meaning of warning.

ignore:
- tests/** # integration test cases or tools.
# Ignore the tool tests
- tests/dashboard
- tests/pdbackup
- tests/pdctl
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27
Expand Down Expand Up @@ -128,7 +128,7 @@ require (
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/lufia/plan9stats v0.0.0-20230326075908-cb1d2100619a // indirect
github.com/mailru/easyjson v0.7.6 // indirect
github.com/mailru/easyjson v0.7.6
github.com/mattn/go-colorable v0.1.8 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mattn/go-runewidth v0.0.8 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333 h1:A6Wqgq0uMw51UiRAH27TVN0QlzVR5CVtV6fTQSAmvKM=
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30 h1:EvqKcDT7ceGLW0mXqM8Cp5Z8DfgQRnwj2YTnlCLj2QI=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
9 changes: 9 additions & 0 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type RegionInfo struct {
readBytes uint64
readKeys uint64
approximateSize int64
approximateKvSize int64
approximateKeys int64
interval *pdpb.TimeInterval
replicationStatus *replication_modepb.RegionReplicationStatus
Expand Down Expand Up @@ -151,6 +152,7 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC
if heartbeat.GetApproximateSize() > 0 && regionSize < EmptyRegionApproximateSize {
regionSize = EmptyRegionApproximateSize
}
regionKvSize := heartbeat.GetApproximateKvSize() / units.MiB

region := &RegionInfo{
term: heartbeat.GetTerm(),
Expand All @@ -164,6 +166,7 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC
readBytes: heartbeat.GetBytesRead(),
readKeys: heartbeat.GetKeysRead(),
approximateSize: int64(regionSize),
approximateKvSize: int64(regionKvSize),
approximateKeys: int64(heartbeat.GetApproximateKeys()),
interval: heartbeat.GetInterval(),
replicationStatus: heartbeat.GetReplicationStatus(),
Expand Down Expand Up @@ -230,6 +233,7 @@ func (r *RegionInfo) Clone(opts ...RegionCreateOption) *RegionInfo {
readBytes: r.readBytes,
readKeys: r.readKeys,
approximateSize: r.approximateSize,
approximateKvSize: r.approximateKvSize,
approximateKeys: r.approximateKeys,
interval: typeutil.DeepClone(r.interval, TimeIntervalFactory),
replicationStatus: r.replicationStatus,
Expand Down Expand Up @@ -520,6 +524,11 @@ func (r *RegionInfo) GetStorePeerApproximateKeys(storeID uint64) int64 {
return r.approximateKeys
}

// GetApproximateKvSize returns the approximate kv size of the region.
func (r *RegionInfo) GetApproximateKvSize() int64 {
return r.approximateKvSize
}

// GetApproximateKeys returns the approximate keys of the region.
func (r *RegionInfo) GetApproximateKeys() int64 {
return r.approximateKeys
Expand Down
7 changes: 7 additions & 0 deletions pkg/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,13 @@ func SetApproximateSize(v int64) RegionCreateOption {
}
}

// SetApproximateKvSize sets the approximate size for the region.
func SetApproximateKvSize(v int64) RegionCreateOption {
return func(region *RegionInfo) {
region.approximateKvSize = v
}
}

// SetApproximateKeys sets the approximate keys for the region.
func SetApproximateKeys(v int64) RegionCreateOption {
return func(region *RegionInfo) {
Expand Down
31 changes: 31 additions & 0 deletions pkg/gc/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gc

import "github.com/prometheus/client_golang/prometheus"

var (
gcSafePointGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "gc",
Name: "gc_safepoint",
Help: "The ts of gc safepoint",
}, []string{"type"})
)

func init() {
prometheus.MustRegister(gcSafePointGauge)
}
Loading

0 comments on commit 5cbccf4

Please sign in to comment.