From 3d4c416d00c22e48a42e7f7e6e0abd150e76ffee Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Tue, 15 Oct 2024 10:44:01 +0800 Subject: [PATCH 1/2] client: move option to option.go (#8699) ref tikv/pd#4399 Signed-off-by: okJiang <819421878@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/client.go | 118 ----------------------------------------------- client/option.go | 117 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+), 118 deletions(-) diff --git a/client/client.go b/client/client.go index 3faa3a09215..27952df13cd 100644 --- a/client/client.go +++ b/client/client.go @@ -36,7 +36,6 @@ import ( "github.com/tikv/pd/client/tlsutil" "github.com/tikv/pd/client/tsoutil" "go.uber.org/zap" - "google.golang.org/grpc" ) const ( @@ -174,69 +173,6 @@ type Client interface { Close() } -// GetStoreOp represents available options when getting stores. -type GetStoreOp struct { - excludeTombstone bool -} - -// GetStoreOption configures GetStoreOp. -type GetStoreOption func(*GetStoreOp) - -// WithExcludeTombstone excludes tombstone stores from the result. -func WithExcludeTombstone() GetStoreOption { - return func(op *GetStoreOp) { op.excludeTombstone = true } -} - -// RegionsOp represents available options when operate regions -type RegionsOp struct { - group string - retryLimit uint64 - skipStoreLimit bool -} - -// RegionsOption configures RegionsOp -type RegionsOption func(op *RegionsOp) - -// WithGroup specify the group during Scatter/Split Regions -func WithGroup(group string) RegionsOption { - return func(op *RegionsOp) { op.group = group } -} - -// WithRetry specify the retry limit during Scatter/Split Regions -func WithRetry(retry uint64) RegionsOption { - return func(op *RegionsOp) { op.retryLimit = retry } -} - -// WithSkipStoreLimit specify if skip the store limit check during Scatter/Split Regions -func WithSkipStoreLimit() RegionsOption { - return func(op *RegionsOp) { op.skipStoreLimit = true } -} - -// GetRegionOp represents available options when getting regions. -type GetRegionOp struct { - needBuckets bool - allowFollowerHandle bool - outputMustContainAllKeyRange bool -} - -// GetRegionOption configures GetRegionOp. -type GetRegionOption func(op *GetRegionOp) - -// WithBuckets means getting region and its buckets. -func WithBuckets() GetRegionOption { - return func(op *GetRegionOp) { op.needBuckets = true } -} - -// WithAllowFollowerHandle means that client can send request to follower and let it handle this request. -func WithAllowFollowerHandle() GetRegionOption { - return func(op *GetRegionOp) { op.allowFollowerHandle = true } -} - -// WithOutputMustContainAllKeyRange means the output must contain all key ranges. -func WithOutputMustContainAllKeyRange() GetRegionOption { - return func(op *GetRegionOp) { op.outputMustContainAllKeyRange = true } -} - var ( // errUnmatchedClusterID is returned when found a PD with a different cluster ID. errUnmatchedClusterID = errors.New("[pd] unmatched cluster id") @@ -250,60 +186,6 @@ var ( errNoServiceModeReturned = errors.New("[pd] no service mode returned") ) -// ClientOption configures client. -type ClientOption func(c *client) - -// WithGRPCDialOptions configures the client with gRPC dial options. -func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOption { - return func(c *client) { - c.option.gRPCDialOptions = append(c.option.gRPCDialOptions, opts...) - } -} - -// WithCustomTimeoutOption configures the client with timeout option. -func WithCustomTimeoutOption(timeout time.Duration) ClientOption { - return func(c *client) { - c.option.timeout = timeout - } -} - -// WithForwardingOption configures the client with forwarding option. -func WithForwardingOption(enableForwarding bool) ClientOption { - return func(c *client) { - c.option.enableForwarding = enableForwarding - } -} - -// WithTSOServerProxyOption configures the client to use TSO server proxy, -// i.e., the client will send TSO requests to the API leader (the TSO server -// proxy) which will forward the requests to the TSO servers. -func WithTSOServerProxyOption(useTSOServerProxy bool) ClientOption { - return func(c *client) { - c.option.useTSOServerProxy = useTSOServerProxy - } -} - -// WithMaxErrorRetry configures the client max retry times when connect meets error. -func WithMaxErrorRetry(count int) ClientOption { - return func(c *client) { - c.option.maxRetryTimes = count - } -} - -// WithMetricsLabels configures the client with metrics labels. -func WithMetricsLabels(labels prometheus.Labels) ClientOption { - return func(c *client) { - c.option.metricsLabels = labels - } -} - -// WithInitMetricsOption configures the client with metrics labels. -func WithInitMetricsOption(initMetrics bool) ClientOption { - return func(c *client) { - c.option.initMetrics = initMetrics - } -} - var _ Client = (*client)(nil) // serviceModeKeeper is for service mode switching. diff --git a/client/option.go b/client/option.go index 3f2b7119b52..ca21dcfefbf 100644 --- a/client/option.go +++ b/client/option.go @@ -142,3 +142,120 @@ func (o *option) setTSOClientRPCConcurrency(value int) { func (o *option) getTSOClientRPCConcurrency() int { return o.dynamicOptions[TSOClientRPCConcurrency].Load().(int) } + +// GetStoreOp represents available options when getting stores. +type GetStoreOp struct { + excludeTombstone bool +} + +// GetStoreOption configures GetStoreOp. +type GetStoreOption func(*GetStoreOp) + +// WithExcludeTombstone excludes tombstone stores from the result. +func WithExcludeTombstone() GetStoreOption { + return func(op *GetStoreOp) { op.excludeTombstone = true } +} + +// RegionsOp represents available options when operate regions +type RegionsOp struct { + group string + retryLimit uint64 + skipStoreLimit bool +} + +// RegionsOption configures RegionsOp +type RegionsOption func(op *RegionsOp) + +// WithGroup specify the group during Scatter/Split Regions +func WithGroup(group string) RegionsOption { + return func(op *RegionsOp) { op.group = group } +} + +// WithRetry specify the retry limit during Scatter/Split Regions +func WithRetry(retry uint64) RegionsOption { + return func(op *RegionsOp) { op.retryLimit = retry } +} + +// WithSkipStoreLimit specify if skip the store limit check during Scatter/Split Regions +func WithSkipStoreLimit() RegionsOption { + return func(op *RegionsOp) { op.skipStoreLimit = true } +} + +// GetRegionOp represents available options when getting regions. +type GetRegionOp struct { + needBuckets bool + allowFollowerHandle bool + outputMustContainAllKeyRange bool +} + +// GetRegionOption configures GetRegionOp. +type GetRegionOption func(op *GetRegionOp) + +// WithBuckets means getting region and its buckets. +func WithBuckets() GetRegionOption { + return func(op *GetRegionOp) { op.needBuckets = true } +} + +// WithAllowFollowerHandle means that client can send request to follower and let it handle this request. +func WithAllowFollowerHandle() GetRegionOption { + return func(op *GetRegionOp) { op.allowFollowerHandle = true } +} + +// WithOutputMustContainAllKeyRange means the output must contain all key ranges. +func WithOutputMustContainAllKeyRange() GetRegionOption { + return func(op *GetRegionOp) { op.outputMustContainAllKeyRange = true } +} + +// ClientOption configures client. +type ClientOption func(c *client) + +// WithGRPCDialOptions configures the client with gRPC dial options. +func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOption { + return func(c *client) { + c.option.gRPCDialOptions = append(c.option.gRPCDialOptions, opts...) + } +} + +// WithCustomTimeoutOption configures the client with timeout option. +func WithCustomTimeoutOption(timeout time.Duration) ClientOption { + return func(c *client) { + c.option.timeout = timeout + } +} + +// WithForwardingOption configures the client with forwarding option. +func WithForwardingOption(enableForwarding bool) ClientOption { + return func(c *client) { + c.option.enableForwarding = enableForwarding + } +} + +// WithTSOServerProxyOption configures the client to use TSO server proxy, +// i.e., the client will send TSO requests to the API leader (the TSO server +// proxy) which will forward the requests to the TSO servers. +func WithTSOServerProxyOption(useTSOServerProxy bool) ClientOption { + return func(c *client) { + c.option.useTSOServerProxy = useTSOServerProxy + } +} + +// WithMaxErrorRetry configures the client max retry times when connect meets error. +func WithMaxErrorRetry(count int) ClientOption { + return func(c *client) { + c.option.maxRetryTimes = count + } +} + +// WithMetricsLabels configures the client with metrics labels. +func WithMetricsLabels(labels prometheus.Labels) ClientOption { + return func(c *client) { + c.option.metricsLabels = labels + } +} + +// WithInitMetricsOption configures the client with metrics labels. +func WithInitMetricsOption(initMetrics bool) ClientOption { + return func(c *client) { + c.option.initMetrics = initMetrics + } +} From b70107ec31e65736db6fc71bc32177b901217ec7 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 16 Oct 2024 14:49:47 +0800 Subject: [PATCH 2/2] client/tso: fix the issue where the TSO follower proxy cannot be closed (#8719) close tikv/pd#8709 Remove outdated follower connections after disabling the TSO follower proxy. Signed-off-by: JmPotato --- client/tso_client.go | 6 +++- tests/integrations/client/client_test.go | 36 +++++++++++++++++++++++- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/client/tso_client.go b/client/tso_client.go index f1538a7f164..6801aee3a11 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -386,7 +386,9 @@ func (c *tsoClient) tryConnectToTSO( cc *grpc.ClientConn updateAndClear = func(newURL string, connectionCtx *tsoConnectionContext) { // Only store the `connectionCtx` if it does not exist before. - connectionCtxs.LoadOrStore(newURL, connectionCtx) + if connectionCtx != nil { + connectionCtxs.LoadOrStore(newURL, connectionCtx) + } // Remove all other `connectionCtx`s. connectionCtxs.Range(func(url, cc any) bool { if url.(string) != newURL { @@ -405,6 +407,8 @@ func (c *tsoClient) tryConnectToTSO( c.svcDiscovery.ScheduleCheckMemberChanged() cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc) if _, ok := connectionCtxs.Load(url); ok { + // Just trigger the clean up of the stale connection contexts. + updateAndClear(url, nil) return nil } if cc != nil { diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 9f0b5f8d523..9574918a74a 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -361,7 +361,8 @@ func TestTSOFollowerProxy(t *testing.T) { defer cli1.Close() cli2 := setupCli(ctx, re, endpoints) defer cli2.Close() - cli2.UpdateOption(pd.EnableTSOFollowerProxy, true) + err = cli2.UpdateOption(pd.EnableTSOFollowerProxy, true) + re.NoError(err) var wg sync.WaitGroup wg.Add(tsoRequestConcurrencyNumber) @@ -385,6 +386,39 @@ func TestTSOFollowerProxy(t *testing.T) { }() } wg.Wait() + + // Disable the follower proxy and check if the stream is updated. + err = cli2.UpdateOption(pd.EnableTSOFollowerProxy, false) + re.NoError(err) + + wg.Add(tsoRequestConcurrencyNumber) + for i := 0; i < tsoRequestConcurrencyNumber; i++ { + go func() { + defer wg.Done() + var lastTS uint64 + for i := 0; i < tsoRequestRound; i++ { + physical, logical, err := cli2.GetTS(context.Background()) + if err != nil { + // It can only be the context canceled error caused by the stale stream cleanup. + re.ErrorContains(err, "context canceled") + continue + } + re.NoError(err) + ts := tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + // After requesting with the follower proxy, request with the leader directly. + physical, logical, err = cli1.GetTS(context.Background()) + re.NoError(err) + ts = tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + } + // Ensure at least one request is successful. + re.NotEmpty(lastTS) + }() + } + wg.Wait() } func TestTSOFollowerProxyWithTSOService(t *testing.T) {