Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Jul 13, 2023
2 parents 4340515 + 42ce6a1 commit ef9532f
Show file tree
Hide file tree
Showing 107 changed files with 3,954 additions and 2,007 deletions.
6 changes: 3 additions & 3 deletions client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash)
default: static tidy test

test:
CGO_ENABLE=1 go test -race -cover
CGO_ENABLE=1 go test ./... -race -cover

basic-test:
CGO_ENABLE=1 go test
CGO_ENABLE=1 go test ./...

ci-test-job:
CGO_ENABLED=1 go test -race -covermode=atomic -coverprofile=covprofile -coverpkg=../... github.com/tikv/pd/client
CGO_ENABLED=1 go test ./... -race -covermode=atomic -coverprofile=covprofile -coverpkg=../... github.com/tikv/pd/client

install-tools:
cd .. && $(MAKE) install-tools
Expand Down
8 changes: 8 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,14 @@ func WithInitMetricsOption(initMetrics bool) ClientOption {
}
}

// WithAllowTSOFallback configures the client with `allowTSOFallback` option.
// NOTICE: This should only be used for testing.
func WithAllowTSOFallback() ClientOption {
return func(c *client) {
c.option.allowTSOFallback = true
}
}

var _ Client = (*client)(nil)

// serviceModeKeeper is for service mode switching.
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2
github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.2
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2 h1:VM6INL8StTPYMKufyHRX2hPUMP7isHnkYvtRMA7Sdsc=
github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1 h1:sC3XRNNBQNjFJGRtSzJRvqi2aDLFOsQoCHItr9rbbY8=
github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
86 changes: 44 additions & 42 deletions client/keyspace_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/grpcutil"
"go.uber.org/zap"
)

// KeyspaceClient manages keyspace metadata.
type KeyspaceClient interface {
// LoadKeyspace load and return target keyspace's metadata.
LoadKeyspace(ctx context.Context, name string) (*keyspacepb.KeyspaceMeta, error)
// WatchKeyspaces watches keyspace meta changes.
WatchKeyspaces(ctx context.Context) (chan []*keyspacepb.KeyspaceMeta, error)
// UpdateKeyspaceState updates target keyspace's state.
UpdateKeyspaceState(ctx context.Context, id uint32, state keyspacepb.KeyspaceState) (*keyspacepb.KeyspaceMeta, error)
// WatchKeyspaces watches keyspace meta changes.
WatchKeyspaces(ctx context.Context) (chan []*keyspacepb.KeyspaceMeta, error)
// GetAllKeyspaces get all keyspace's metadata.
GetAllKeyspaces(ctx context.Context, startID uint32, limit uint32) ([]*keyspacepb.KeyspaceMeta, error)
}

// keyspaceClient returns the KeyspaceClient from current PD leader.
Expand Down Expand Up @@ -75,44 +75,6 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key
return resp.Keyspace, nil
}

// WatchKeyspaces watches keyspace meta changes.
// It returns a stream of slices of keyspace metadata.
// The first message in stream contains all current keyspaceMeta,
// all subsequent messages contains new put events for all keyspaces.
func (c *client) WatchKeyspaces(ctx context.Context) (chan []*keyspacepb.KeyspaceMeta, error) {
keyspaceWatcherChan := make(chan []*keyspacepb.KeyspaceMeta)
req := &keyspacepb.WatchKeyspacesRequest{
Header: c.requestHeader(),
}
stream, err := c.keyspaceClient().WatchKeyspaces(ctx, req)
if err != nil {
close(keyspaceWatcherChan)
return nil, err
}
go func() {
defer func() {
close(keyspaceWatcherChan)
if r := recover(); r != nil {
log.Error("[pd] panic in keyspace client `WatchKeyspaces`", zap.Any("error", r))
return
}
}()
for {
select {
case <-ctx.Done():
return
default:
resp, err := stream.Recv()
if err != nil {
return
}
keyspaceWatcherChan <- resp.Keyspaces
}
}
}()
return keyspaceWatcherChan, err
}

// UpdateKeyspaceState attempts to update the keyspace specified by ID to the target state,
// it will also record StateChangedAt for the given keyspace if a state change took place.
// Currently, legal operations includes:
Expand Down Expand Up @@ -153,3 +115,43 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp

return resp.Keyspace, nil
}

// WatchKeyspaces watches keyspace meta changes.
// It returns a stream of slices of keyspace metadata.
// The first message in stream contains all current keyspaceMeta,
// all subsequent messages contains new put events for all keyspaces.
func (c *client) WatchKeyspaces(ctx context.Context) (chan []*keyspacepb.KeyspaceMeta, error) {
return nil, errors.Errorf("WatchKeyspaces unimplemented")
}

// GetAllKeyspaces get all keyspaces metadata.
func (c *client) GetAllKeyspaces(ctx context.Context, startID uint32, limit uint32) ([]*keyspacepb.KeyspaceMeta, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("keyspaceClient.GetAllKeyspaces", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &keyspacepb.GetAllKeyspacesRequest{
Header: c.requestHeader(),
StartId: startID,
Limit: limit,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.keyspaceClient().GetAllKeyspaces(ctx, req)
cancel()

if err != nil {
cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds())
c.pdSvcDiscovery.ScheduleCheckMemberChanged()
return nil, err
}

if resp.Header.GetError() != nil {
cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds())
return nil, errors.Errorf("Get all keyspaces metadata failed: %s", resp.Header.GetError().String())
}

return resp.Keyspaces, nil
}
2 changes: 2 additions & 0 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ var (
cmdDurationSplitAndScatterRegions prometheus.Observer
cmdDurationLoadKeyspace prometheus.Observer
cmdDurationUpdateKeyspaceState prometheus.Observer
cmdDurationGetAllKeyspaces prometheus.Observer
cmdDurationGet prometheus.Observer
cmdDurationPut prometheus.Observer
cmdDurationUpdateGCSafePointV2 prometheus.Observer
Expand Down Expand Up @@ -184,6 +185,7 @@ func initCmdDurations() {
cmdDurationSplitAndScatterRegions = cmdDuration.WithLabelValues("split_and_scatter_regions")
cmdDurationLoadKeyspace = cmdDuration.WithLabelValues("load_keyspace")
cmdDurationUpdateKeyspaceState = cmdDuration.WithLabelValues("update_keyspace_state")
cmdDurationGetAllKeyspaces = cmdDuration.WithLabelValues("get_all_keyspaces")
cmdDurationGet = cmdDuration.WithLabelValues("get")
cmdDurationPut = cmdDuration.WithLabelValues("put")
cmdDurationUpdateGCSafePointV2 = cmdDuration.WithLabelValues("update_gc_safe_point_v2")
Expand Down
1 change: 1 addition & 0 deletions client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type option struct {
enableForwarding bool
metricsLabels prometheus.Labels
initMetrics bool
allowTSOFallback bool

// Dynamic options.
dynamicOptions [dynamicOptionCount]atomic.Value
Expand Down
10 changes: 9 additions & 1 deletion client/resource_group/controller/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,19 @@ func TestReconfig(t *testing.T) {
args := tokenBucketReconfigureArgs{
NewTokens: 6.,
NewRate: 2,
NewBurst: -1,
}
lim.Reconfigure(t1, args)
checkTokens(re, lim, t1, 5)
checkTokens(re, lim, t2, 7)

args = tokenBucketReconfigureArgs{
NewTokens: 6.,
NewRate: 2,
NewBurst: -1,
}
lim.Reconfigure(t1, args)
checkTokens(re, lim, t1, 6)
checkTokens(re, lim, t2, 6)
re.Equal(int64(-1), lim.GetBurst())
}

Expand Down
20 changes: 0 additions & 20 deletions client/resource_group/controller/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,6 @@ func TestGetRUValueFromConsumption(t *testing.T) {

result = getRUValueFromConsumption(custom, typ)
re.Equal(expected, result)

// When typ is not RU
custom = &rmpb.Consumption{RRU: 2.5, WRU: 3.5}
typ = rmpb.RequestUnitType_RU
expected = float64(0)

result = getRUValueFromConsumption(custom, typ)
re.Equal(expected, result)
}

func TestGetRUTokenBucketSetting(t *testing.T) {
Expand All @@ -69,18 +61,6 @@ func TestGetRUTokenBucketSetting(t *testing.T) {
if result != expected {
t.Errorf("Expected nil but got %v", result)
}

// When typ is not RU
group = &rmpb.ResourceGroup{
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 100}},
},
}
typ = rmpb.RequestUnitType_RU
expected = nil

result = getRUTokenBucketSetting(group, typ)
re.Equal(expected, result)
}

func TestGetRawResourceValueFromConsumption(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
groupSettingsPathPrefix = "resource_group/settings"
// errNotPrimary is returned when the requested server is not primary.
errNotPrimary = "not primary"
// errNotLeader is returned when the requested server is not pd leader.
errNotLeader = "not leader"
)

// GroupSettingsPathPrefixBytes is used to watch or get resource groups.
Expand Down Expand Up @@ -65,7 +67,7 @@ func (c *client) resourceManagerClient() (rmpb.ResourceManagerClient, error) {

// gRPCErrorHandler is used to handle the gRPC error returned by the resource manager service.
func (c *client) gRPCErrorHandler(err error) {
if strings.Contains(err.Error(), errNotPrimary) {
if strings.Contains(err.Error(), errNotPrimary) || strings.Contains(err.Error(), errNotLeader) {
c.pdSvcDiscovery.ScheduleCheckMemberChanged()
}
}
Expand Down
20 changes: 17 additions & 3 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,22 @@ func (c *tsoClient) compareAndSwapTS(
// all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned
// last time.
if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) {
log.Panic("[tso] timestamp fallback",
if !c.option.allowTSOFallback {
log.Panic("[tso] timestamp fallback",
zap.String("dc-location", dcLocation),
zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()),
zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)),
zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)),
zap.String("last-tso-server", lastTSOInfo.tsoServer),
zap.String("cur-tso-server", curTSOInfo.tsoServer),
zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID),
zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID),
zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt),
zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt))
}
log.Error("[tso] timestamp fallback",
zap.String("dc-location", dcLocation),
zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()),
zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)),
Expand All @@ -799,8 +814,7 @@ func (c *tsoClient) compareAndSwapTS(
zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID),
zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt),
zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt),
)
zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt))
}
lastTSOInfo.tsoServer = curTSOInfo.tsoServer
lastTSOInfo.reqKeyspaceGroupID = curTSOInfo.reqKeyspaceGroupID
Expand Down
9 changes: 8 additions & 1 deletion cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func NewTSOServiceCommand() *cobra.Command {
cmd.Flags().StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
cmd.Flags().StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
cmd.Flags().StringP("key", "", "", "path of file that contains X509 key in PEM format")
cmd.Flags().StringP("log-level", "L", "info", "log level: debug, info, warn, error, fatal (default 'info')")
cmd.Flags().StringP("log-file", "", "", "log file path")
return cmd
}

Expand All @@ -104,6 +106,8 @@ func NewResourceManagerServiceCommand() *cobra.Command {
cmd.Flags().StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
cmd.Flags().StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
cmd.Flags().StringP("key", "", "", "path of file that contains X509 key in PEM format")
cmd.Flags().StringP("log-level", "L", "info", "log level: debug, info, warn, error, fatal (default 'info')")
cmd.Flags().StringP("log-file", "", "", "log file path")
return cmd
}

Expand Down Expand Up @@ -208,7 +212,10 @@ func start(cmd *cobra.Command, args []string, services ...string) {

// Creates server.
ctx, cancel := context.WithCancel(context.Background())
serviceBuilders := []server.HandlerBuilder{api.NewHandler, apiv2.NewV2Handler, swaggerserver.NewHandler, autoscaling.NewHandler}
serviceBuilders := []server.HandlerBuilder{api.NewHandler, apiv2.NewV2Handler, autoscaling.NewHandler}
if swaggerserver.Enabled() {
serviceBuilders = append(serviceBuilders, swaggerserver.NewHandler)
}
serviceBuilders = append(serviceBuilders, dashboard.GetServiceBuilders()...)
svr, err := server.CreateServer(ctx, cfg, services, serviceBuilders...)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2
github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27
Expand Down
Loading

0 comments on commit ef9532f

Please sign in to comment.