Skip to content

Commit

Permalink
Merge branch 'master' into fix-label-leak
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Oct 16, 2024
2 parents 7075c58 + b70107e commit 5de15e8
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 120 deletions.
118 changes: 0 additions & 118 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/tikv/pd/client/tlsutil"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/zap"
"google.golang.org/grpc"
)

const (
Expand Down Expand Up @@ -174,69 +173,6 @@ type Client interface {
Close()
}

// GetStoreOp represents available options when getting stores.
type GetStoreOp struct {
excludeTombstone bool
}

// GetStoreOption configures GetStoreOp.
type GetStoreOption func(*GetStoreOp)

// WithExcludeTombstone excludes tombstone stores from the result.
func WithExcludeTombstone() GetStoreOption {
return func(op *GetStoreOp) { op.excludeTombstone = true }
}

// RegionsOp represents available options when operate regions
type RegionsOp struct {
group string
retryLimit uint64
skipStoreLimit bool
}

// RegionsOption configures RegionsOp
type RegionsOption func(op *RegionsOp)

// WithGroup specify the group during Scatter/Split Regions
func WithGroup(group string) RegionsOption {
return func(op *RegionsOp) { op.group = group }
}

// WithRetry specify the retry limit during Scatter/Split Regions
func WithRetry(retry uint64) RegionsOption {
return func(op *RegionsOp) { op.retryLimit = retry }
}

// WithSkipStoreLimit specify if skip the store limit check during Scatter/Split Regions
func WithSkipStoreLimit() RegionsOption {
return func(op *RegionsOp) { op.skipStoreLimit = true }
}

// GetRegionOp represents available options when getting regions.
type GetRegionOp struct {
needBuckets bool
allowFollowerHandle bool
outputMustContainAllKeyRange bool
}

// GetRegionOption configures GetRegionOp.
type GetRegionOption func(op *GetRegionOp)

// WithBuckets means getting region and its buckets.
func WithBuckets() GetRegionOption {
return func(op *GetRegionOp) { op.needBuckets = true }
}

// WithAllowFollowerHandle means that client can send request to follower and let it handle this request.
func WithAllowFollowerHandle() GetRegionOption {
return func(op *GetRegionOp) { op.allowFollowerHandle = true }
}

// WithOutputMustContainAllKeyRange means the output must contain all key ranges.
func WithOutputMustContainAllKeyRange() GetRegionOption {
return func(op *GetRegionOp) { op.outputMustContainAllKeyRange = true }
}

var (
// errUnmatchedClusterID is returned when found a PD with a different cluster ID.
errUnmatchedClusterID = errors.New("[pd] unmatched cluster id")
Expand All @@ -250,60 +186,6 @@ var (
errNoServiceModeReturned = errors.New("[pd] no service mode returned")
)

// ClientOption configures client.
type ClientOption func(c *client)

// WithGRPCDialOptions configures the client with gRPC dial options.
func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOption {
return func(c *client) {
c.option.gRPCDialOptions = append(c.option.gRPCDialOptions, opts...)
}
}

// WithCustomTimeoutOption configures the client with timeout option.
func WithCustomTimeoutOption(timeout time.Duration) ClientOption {
return func(c *client) {
c.option.timeout = timeout
}
}

// WithForwardingOption configures the client with forwarding option.
func WithForwardingOption(enableForwarding bool) ClientOption {
return func(c *client) {
c.option.enableForwarding = enableForwarding
}
}

// WithTSOServerProxyOption configures the client to use TSO server proxy,
// i.e., the client will send TSO requests to the API leader (the TSO server
// proxy) which will forward the requests to the TSO servers.
func WithTSOServerProxyOption(useTSOServerProxy bool) ClientOption {
return func(c *client) {
c.option.useTSOServerProxy = useTSOServerProxy
}
}

// WithMaxErrorRetry configures the client max retry times when connect meets error.
func WithMaxErrorRetry(count int) ClientOption {
return func(c *client) {
c.option.maxRetryTimes = count
}
}

// WithMetricsLabels configures the client with metrics labels.
func WithMetricsLabels(labels prometheus.Labels) ClientOption {
return func(c *client) {
c.option.metricsLabels = labels
}
}

// WithInitMetricsOption configures the client with metrics labels.
func WithInitMetricsOption(initMetrics bool) ClientOption {
return func(c *client) {
c.option.initMetrics = initMetrics
}
}

var _ Client = (*client)(nil)

// serviceModeKeeper is for service mode switching.
Expand Down
117 changes: 117 additions & 0 deletions client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,120 @@ func (o *option) setTSOClientRPCConcurrency(value int) {
func (o *option) getTSOClientRPCConcurrency() int {
return o.dynamicOptions[TSOClientRPCConcurrency].Load().(int)
}

// GetStoreOp represents available options when getting stores.
type GetStoreOp struct {
excludeTombstone bool
}

// GetStoreOption configures GetStoreOp.
type GetStoreOption func(*GetStoreOp)

// WithExcludeTombstone excludes tombstone stores from the result.
func WithExcludeTombstone() GetStoreOption {
return func(op *GetStoreOp) { op.excludeTombstone = true }
}

// RegionsOp represents available options when operate regions
type RegionsOp struct {
group string
retryLimit uint64
skipStoreLimit bool
}

// RegionsOption configures RegionsOp
type RegionsOption func(op *RegionsOp)

// WithGroup specify the group during Scatter/Split Regions
func WithGroup(group string) RegionsOption {
return func(op *RegionsOp) { op.group = group }
}

// WithRetry specify the retry limit during Scatter/Split Regions
func WithRetry(retry uint64) RegionsOption {
return func(op *RegionsOp) { op.retryLimit = retry }
}

// WithSkipStoreLimit specify if skip the store limit check during Scatter/Split Regions
func WithSkipStoreLimit() RegionsOption {
return func(op *RegionsOp) { op.skipStoreLimit = true }
}

// GetRegionOp represents available options when getting regions.
type GetRegionOp struct {
needBuckets bool
allowFollowerHandle bool
outputMustContainAllKeyRange bool
}

// GetRegionOption configures GetRegionOp.
type GetRegionOption func(op *GetRegionOp)

// WithBuckets means getting region and its buckets.
func WithBuckets() GetRegionOption {
return func(op *GetRegionOp) { op.needBuckets = true }
}

// WithAllowFollowerHandle means that client can send request to follower and let it handle this request.
func WithAllowFollowerHandle() GetRegionOption {
return func(op *GetRegionOp) { op.allowFollowerHandle = true }
}

// WithOutputMustContainAllKeyRange means the output must contain all key ranges.
func WithOutputMustContainAllKeyRange() GetRegionOption {
return func(op *GetRegionOp) { op.outputMustContainAllKeyRange = true }
}

// ClientOption configures client.
type ClientOption func(c *client)

// WithGRPCDialOptions configures the client with gRPC dial options.
func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOption {
return func(c *client) {
c.option.gRPCDialOptions = append(c.option.gRPCDialOptions, opts...)
}
}

// WithCustomTimeoutOption configures the client with timeout option.
func WithCustomTimeoutOption(timeout time.Duration) ClientOption {
return func(c *client) {
c.option.timeout = timeout
}
}

// WithForwardingOption configures the client with forwarding option.
func WithForwardingOption(enableForwarding bool) ClientOption {
return func(c *client) {
c.option.enableForwarding = enableForwarding
}
}

// WithTSOServerProxyOption configures the client to use TSO server proxy,
// i.e., the client will send TSO requests to the API leader (the TSO server
// proxy) which will forward the requests to the TSO servers.
func WithTSOServerProxyOption(useTSOServerProxy bool) ClientOption {
return func(c *client) {
c.option.useTSOServerProxy = useTSOServerProxy
}
}

// WithMaxErrorRetry configures the client max retry times when connect meets error.
func WithMaxErrorRetry(count int) ClientOption {
return func(c *client) {
c.option.maxRetryTimes = count
}
}

// WithMetricsLabels configures the client with metrics labels.
func WithMetricsLabels(labels prometheus.Labels) ClientOption {
return func(c *client) {
c.option.metricsLabels = labels
}
}

// WithInitMetricsOption configures the client with metrics labels.
func WithInitMetricsOption(initMetrics bool) ClientOption {
return func(c *client) {
c.option.initMetrics = initMetrics
}
}
6 changes: 5 additions & 1 deletion client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,9 @@ func (c *tsoClient) tryConnectToTSO(
cc *grpc.ClientConn
updateAndClear = func(newURL string, connectionCtx *tsoConnectionContext) {
// Only store the `connectionCtx` if it does not exist before.
connectionCtxs.LoadOrStore(newURL, connectionCtx)
if connectionCtx != nil {
connectionCtxs.LoadOrStore(newURL, connectionCtx)
}
// Remove all other `connectionCtx`s.
connectionCtxs.Range(func(url, cc any) bool {
if url.(string) != newURL {
Expand All @@ -405,6 +407,8 @@ func (c *tsoClient) tryConnectToTSO(
c.svcDiscovery.ScheduleCheckMemberChanged()
cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc)
if _, ok := connectionCtxs.Load(url); ok {
// Just trigger the clean up of the stale connection contexts.
updateAndClear(url, nil)
return nil
}
if cc != nil {
Expand Down
36 changes: 35 additions & 1 deletion tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ func TestTSOFollowerProxy(t *testing.T) {
defer cli1.Close()
cli2 := setupCli(ctx, re, endpoints)
defer cli2.Close()
cli2.UpdateOption(pd.EnableTSOFollowerProxy, true)
err = cli2.UpdateOption(pd.EnableTSOFollowerProxy, true)
re.NoError(err)

var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber)
Expand All @@ -385,6 +386,39 @@ func TestTSOFollowerProxy(t *testing.T) {
}()
}
wg.Wait()

// Disable the follower proxy and check if the stream is updated.
err = cli2.UpdateOption(pd.EnableTSOFollowerProxy, false)
re.NoError(err)

wg.Add(tsoRequestConcurrencyNumber)
for i := 0; i < tsoRequestConcurrencyNumber; i++ {
go func() {
defer wg.Done()
var lastTS uint64
for i := 0; i < tsoRequestRound; i++ {
physical, logical, err := cli2.GetTS(context.Background())
if err != nil {
// It can only be the context canceled error caused by the stale stream cleanup.
re.ErrorContains(err, "context canceled")
continue
}
re.NoError(err)
ts := tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
lastTS = ts
// After requesting with the follower proxy, request with the leader directly.
physical, logical, err = cli1.GetTS(context.Background())
re.NoError(err)
ts = tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
lastTS = ts
}
// Ensure at least one request is successful.
re.NotEmpty(lastTS)
}()
}
wg.Wait()
}

func TestTSOFollowerProxyWithTSOService(t *testing.T) {
Expand Down

0 comments on commit 5de15e8

Please sign in to comment.