From aa7301d4bca296e2b457bcef60f92a7bcb37aa3b Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Tue, 4 Mar 2025 09:42:09 +0800 Subject: [PATCH] pdclient: Add caller info to pd client (#1516) ref tikv/pd#8593 Signed-off-by: okJiang <819421878@qq.com> --- internal/locate/pd_codec.go | 12 ++++++++++-- internal/locate/region_cache.go | 2 +- internal/locate/store_cache.go | 2 +- oracle/oracles/pd.go | 2 +- oracle/oracles/pd_test.go | 9 +++++++++ rawkv/rawkv.go | 5 +++-- tikv/kv.go | 4 ++-- txnkv/client.go | 2 +- util/pd_interceptor.go | 10 ++++++++++ 9 files changed, 38 insertions(+), 10 deletions(-) diff --git a/internal/locate/pd_codec.go b/internal/locate/pd_codec.go index 6041ad527d..15008beeaa 100644 --- a/internal/locate/pd_codec.go +++ b/internal/locate/pd_codec.go @@ -44,10 +44,13 @@ import ( pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/clients/router" "github.com/tikv/pd/client/opt" + "github.com/tikv/pd/client/pkg/caller" ) var _ pd.Client = &CodecPDClient{} +const componentName = "codec-pd-client" + // CodecPDClient wraps a PD Client to decode the encoded keys in region meta. type CodecPDClient struct { pd.Client @@ -57,7 +60,7 @@ type CodecPDClient struct { // NewCodecPDClient creates a CodecPDClient in API v1. func NewCodecPDClient(mode apicodec.Mode, client pd.Client) *CodecPDClient { codec := apicodec.NewCodecV1(mode) - return &CodecPDClient{client, codec} + return &CodecPDClient{client.WithCallerComponent(componentName), codec} } // NewCodecPDClientWithKeyspace creates a CodecPDClient in API v2 with keyspace name. @@ -71,7 +74,7 @@ func NewCodecPDClientWithKeyspace(mode apicodec.Mode, client pd.Client, keyspace return nil, err } - return &CodecPDClient{client, codec}, nil + return &CodecPDClient{client.WithCallerComponent(componentName), codec}, nil } // GetKeyspaceID attempts to retrieve keyspace ID corresponding to the given keyspace name from PD. @@ -202,3 +205,8 @@ func (c *CodecPDClient) decodeRegionKeyInPlace(r *router.Region) error { } return err } + +// WithCallerComponent returns a new PD client with the specified caller component. +func (c *CodecPDClient) WithCallerComponent(component caller.Component) pd.Client { + return &CodecPDClient{c.Client.WithCallerComponent(component), c.codec} +} diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 8164d36764..d6887f00cc 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -687,7 +687,7 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache { } c := &RegionCache{ - pdClient: pdClient, + pdClient: pdClient.WithCallerComponent("region-cache"), requestHealthFeedbackCallback: options.requestHealthFeedbackCallback, } diff --git a/internal/locate/store_cache.go b/internal/locate/store_cache.go index 7d4583ff22..b60481421a 100644 --- a/internal/locate/store_cache.go +++ b/internal/locate/store_cache.go @@ -71,7 +71,7 @@ type storeCache interface { } func newStoreCache(pdClient pd.Client) *storeCacheImpl { - c := &storeCacheImpl{pdClient: pdClient} + c := &storeCacheImpl{pdClient: pdClient.WithCallerComponent("store-cache")} c.notifyCheckCh = make(chan struct{}, 1) c.storeMu.stores = make(map[uint64]*Store) c.tiflashComputeStoreMu.needReload = true diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index 96fa2fca46..c2171c0f8e 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -180,7 +180,7 @@ func NewPdOracle(pdClient pd.Client, options *PDOracleOptions) (oracle.Oracle, e } o := &pdOracle{ - c: pdClient, + c: pdClient.WithCallerComponent("oracle"), quit: make(chan struct{}), lastTSUpdateInterval: atomic.Int64{}, } diff --git a/oracle/oracles/pd_test.go b/oracle/oracles/pd_test.go index b79bfdeb82..2114710117 100644 --- a/oracle/oracles/pd_test.go +++ b/oracle/oracles/pd_test.go @@ -49,6 +49,7 @@ import ( "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/pkg/caller" ) func TestPDOracle_UntilExpired(t *testing.T) { @@ -91,6 +92,10 @@ func (c *MockPdClient) GetTS(ctx context.Context) (int64, int64, error) { return 0, c.logicalTimestamp.Add(1), nil } +func (c *MockPdClient) WithCallerComponent(component caller.Component) pd.Client { + return c +} + func TestPdOracle_SetLowResolutionTimestampUpdateInterval(t *testing.T) { pdClient := MockPdClient{} o := NewPdOracleWithClient(&pdClient) @@ -420,6 +425,10 @@ func (c *MockPDClientWithPause) Resume() { c.mu.Unlock() } +func (c *MockPDClientWithPause) WithCallerComponent(component caller.Component) pd.Client { + return c +} + func TestValidateReadTSForStaleReadReusingGetTSResult(t *testing.T) { util.EnableFailpoints() diff --git a/rawkv/rawkv.go b/rawkv/rawkv.go index da59325b95..91f40780e9 100644 --- a/rawkv/rawkv.go +++ b/rawkv/rawkv.go @@ -69,6 +69,7 @@ const ( rawBatchPutSize = 16 * 1024 // rawBatchPairCount is the maximum limit for rawkv each batch get/delete request. rawBatchPairCount = 512 + componentName = caller.Component("rawkv-client-go") ) type rawOptions struct { @@ -204,7 +205,7 @@ func NewClientWithOpts(ctx context.Context, pdAddrs []string, opts ...ClientOpt) } // Use an unwrapped PDClient to obtain keyspace meta. - pdCli, err := pd.NewClientWithContext(ctx, caller.Component("rawkv-client-go"), pdAddrs, pd.SecurityOption{ + pdCli, err := pd.NewClientWithContext(ctx, componentName, pdAddrs, pd.SecurityOption{ CAPath: opt.security.ClusterSSLCA, CertPath: opt.security.ClusterSSLCert, KeyPath: opt.security.ClusterSSLKey, @@ -240,7 +241,7 @@ func NewClientWithOpts(ctx context.Context, pdAddrs []string, opts ...ClientOpt) apiVersion: opt.apiVersion, clusterID: pdCli.GetClusterID(ctx), regionCache: locate.NewRegionCache(pdCli), - pdClient: pdCli, + pdClient: pdCli.WithCallerComponent(componentName), rpcClient: rpcCli, }, nil } diff --git a/tikv/kv.go b/tikv/kv.go index cd5f17d7ba..275d07a421 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -285,7 +285,7 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl clusterID: pdClient.GetClusterID(context.TODO()), uuid: uuid, oracle: o, - pdClient: pdClient, + pdClient: pdClient.WithCallerComponent("kv-store"), regionCache: regionCache, kv: spkv, safePoint: 0, @@ -916,7 +916,7 @@ func NewLockResolver(etcdAddrs []string, security config.Security, opts ...opt.C if err != nil { return nil, errors.WithStack(err) } - pdCli = util.InterceptedPDClient{Client: pdCli} + pdCli = util.NewInterceptedPDClient(pdCli) uuid := fmt.Sprintf("tikv-%v", pdCli.GetClusterID(context.TODO())) tlsConfig, err := security.ToTLSConfig() diff --git a/txnkv/client.go b/txnkv/client.go index aa37333598..bfcaad47a7 100644 --- a/txnkv/client.go +++ b/txnkv/client.go @@ -76,7 +76,7 @@ func NewClient(pdAddrs []string, opts ...ClientOpt) (*Client, error) { return nil, errors.WithStack(err) } - pdClient = util.InterceptedPDClient{Client: pdClient} + pdClient = util.NewInterceptedPDClient(pdClient) // Construct codec from options. var codecCli *tikv.CodecPDClient diff --git a/util/pd_interceptor.go b/util/pd_interceptor.go index 41eaa661fc..614fb27520 100644 --- a/util/pd_interceptor.go +++ b/util/pd_interceptor.go @@ -44,6 +44,7 @@ import ( "github.com/tikv/pd/client/clients/router" "github.com/tikv/pd/client/clients/tso" "github.com/tikv/pd/client/opt" + "github.com/tikv/pd/client/pkg/caller" ) var ( @@ -64,6 +65,10 @@ type InterceptedPDClient struct { pd.Client } +func NewInterceptedPDClient(client pd.Client) *InterceptedPDClient { + return &InterceptedPDClient{client.WithCallerComponent("intercepted-pd-client")} +} + // interceptedTsFuture is a PD's wrapper future to record stmt detail. type interceptedTsFuture struct { tso.TSFuture @@ -137,3 +142,8 @@ func (m InterceptedPDClient) GetStore(ctx context.Context, storeID uint64) (*met recordPDWaitTime(ctx, start) return s, err } + +// WithCallerComponent implements pd.Client#WithCallerComponent. +func (m InterceptedPDClient) WithCallerComponent(component caller.Component) pd.Client { + return NewInterceptedPDClient(m.Client.WithCallerComponent(component)) +}