Skip to content

Commit

Permalink
pdclient: Add caller info to pd client (#1516)
Browse files Browse the repository at this point in the history
ref tikv/pd#8593

Signed-off-by: okJiang <[email protected]>
  • Loading branch information
okJiang authored Mar 4, 2025
1 parent 10a84d0 commit aa7301d
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 10 deletions.
12 changes: 10 additions & 2 deletions internal/locate/pd_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@ import (
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/router"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/caller"
)

var _ pd.Client = &CodecPDClient{}

const componentName = "codec-pd-client"

// CodecPDClient wraps a PD Client to decode the encoded keys in region meta.
type CodecPDClient struct {
pd.Client
Expand All @@ -57,7 +60,7 @@ type CodecPDClient struct {
// NewCodecPDClient creates a CodecPDClient in API v1.
func NewCodecPDClient(mode apicodec.Mode, client pd.Client) *CodecPDClient {
codec := apicodec.NewCodecV1(mode)
return &CodecPDClient{client, codec}
return &CodecPDClient{client.WithCallerComponent(componentName), codec}
}

// NewCodecPDClientWithKeyspace creates a CodecPDClient in API v2 with keyspace name.
Expand All @@ -71,7 +74,7 @@ func NewCodecPDClientWithKeyspace(mode apicodec.Mode, client pd.Client, keyspace
return nil, err
}

return &CodecPDClient{client, codec}, nil
return &CodecPDClient{client.WithCallerComponent(componentName), codec}, nil
}

// GetKeyspaceID attempts to retrieve keyspace ID corresponding to the given keyspace name from PD.
Expand Down Expand Up @@ -202,3 +205,8 @@ func (c *CodecPDClient) decodeRegionKeyInPlace(r *router.Region) error {
}
return err
}

// WithCallerComponent returns a new PD client with the specified caller component.
func (c *CodecPDClient) WithCallerComponent(component caller.Component) pd.Client {
return &CodecPDClient{c.Client.WithCallerComponent(component), c.codec}
}
2 changes: 1 addition & 1 deletion internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
}

c := &RegionCache{
pdClient: pdClient,
pdClient: pdClient.WithCallerComponent("region-cache"),
requestHealthFeedbackCallback: options.requestHealthFeedbackCallback,
}

Expand Down
2 changes: 1 addition & 1 deletion internal/locate/store_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type storeCache interface {
}

func newStoreCache(pdClient pd.Client) *storeCacheImpl {
c := &storeCacheImpl{pdClient: pdClient}
c := &storeCacheImpl{pdClient: pdClient.WithCallerComponent("store-cache")}
c.notifyCheckCh = make(chan struct{}, 1)
c.storeMu.stores = make(map[uint64]*Store)
c.tiflashComputeStoreMu.needReload = true
Expand Down
2 changes: 1 addition & 1 deletion oracle/oracles/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func NewPdOracle(pdClient pd.Client, options *PDOracleOptions) (oracle.Oracle, e
}

o := &pdOracle{
c: pdClient,
c: pdClient.WithCallerComponent("oracle"),
quit: make(chan struct{}),
lastTSUpdateInterval: atomic.Int64{},
}
Expand Down
9 changes: 9 additions & 0 deletions oracle/oracles/pd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/pkg/caller"
)

func TestPDOracle_UntilExpired(t *testing.T) {
Expand Down Expand Up @@ -91,6 +92,10 @@ func (c *MockPdClient) GetTS(ctx context.Context) (int64, int64, error) {
return 0, c.logicalTimestamp.Add(1), nil
}

func (c *MockPdClient) WithCallerComponent(component caller.Component) pd.Client {
return c
}

func TestPdOracle_SetLowResolutionTimestampUpdateInterval(t *testing.T) {
pdClient := MockPdClient{}
o := NewPdOracleWithClient(&pdClient)
Expand Down Expand Up @@ -420,6 +425,10 @@ func (c *MockPDClientWithPause) Resume() {
c.mu.Unlock()
}

func (c *MockPDClientWithPause) WithCallerComponent(component caller.Component) pd.Client {
return c
}

func TestValidateReadTSForStaleReadReusingGetTSResult(t *testing.T) {
util.EnableFailpoints()

Expand Down
5 changes: 3 additions & 2 deletions rawkv/rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ const (
rawBatchPutSize = 16 * 1024
// rawBatchPairCount is the maximum limit for rawkv each batch get/delete request.
rawBatchPairCount = 512
componentName = caller.Component("rawkv-client-go")
)

type rawOptions struct {
Expand Down Expand Up @@ -204,7 +205,7 @@ func NewClientWithOpts(ctx context.Context, pdAddrs []string, opts ...ClientOpt)
}

// Use an unwrapped PDClient to obtain keyspace meta.
pdCli, err := pd.NewClientWithContext(ctx, caller.Component("rawkv-client-go"), pdAddrs, pd.SecurityOption{
pdCli, err := pd.NewClientWithContext(ctx, componentName, pdAddrs, pd.SecurityOption{
CAPath: opt.security.ClusterSSLCA,
CertPath: opt.security.ClusterSSLCert,
KeyPath: opt.security.ClusterSSLKey,
Expand Down Expand Up @@ -240,7 +241,7 @@ func NewClientWithOpts(ctx context.Context, pdAddrs []string, opts ...ClientOpt)
apiVersion: opt.apiVersion,
clusterID: pdCli.GetClusterID(ctx),
regionCache: locate.NewRegionCache(pdCli),
pdClient: pdCli,
pdClient: pdCli.WithCallerComponent(componentName),
rpcClient: rpcCli,
}, nil
}
Expand Down
4 changes: 2 additions & 2 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl
clusterID: pdClient.GetClusterID(context.TODO()),
uuid: uuid,
oracle: o,
pdClient: pdClient,
pdClient: pdClient.WithCallerComponent("kv-store"),
regionCache: regionCache,
kv: spkv,
safePoint: 0,
Expand Down Expand Up @@ -916,7 +916,7 @@ func NewLockResolver(etcdAddrs []string, security config.Security, opts ...opt.C
if err != nil {
return nil, errors.WithStack(err)
}
pdCli = util.InterceptedPDClient{Client: pdCli}
pdCli = util.NewInterceptedPDClient(pdCli)
uuid := fmt.Sprintf("tikv-%v", pdCli.GetClusterID(context.TODO()))

tlsConfig, err := security.ToTLSConfig()
Expand Down
2 changes: 1 addition & 1 deletion txnkv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewClient(pdAddrs []string, opts ...ClientOpt) (*Client, error) {
return nil, errors.WithStack(err)
}

pdClient = util.InterceptedPDClient{Client: pdClient}
pdClient = util.NewInterceptedPDClient(pdClient)

// Construct codec from options.
var codecCli *tikv.CodecPDClient
Expand Down
10 changes: 10 additions & 0 deletions util/pd_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/tikv/pd/client/clients/router"
"github.com/tikv/pd/client/clients/tso"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/caller"
)

var (
Expand All @@ -64,6 +65,10 @@ type InterceptedPDClient struct {
pd.Client
}

func NewInterceptedPDClient(client pd.Client) *InterceptedPDClient {
return &InterceptedPDClient{client.WithCallerComponent("intercepted-pd-client")}
}

// interceptedTsFuture is a PD's wrapper future to record stmt detail.
type interceptedTsFuture struct {
tso.TSFuture
Expand Down Expand Up @@ -137,3 +142,8 @@ func (m InterceptedPDClient) GetStore(ctx context.Context, storeID uint64) (*met
recordPDWaitTime(ctx, start)
return s, err
}

// WithCallerComponent implements pd.Client#WithCallerComponent.
func (m InterceptedPDClient) WithCallerComponent(component caller.Component) pd.Client {
return NewInterceptedPDClient(m.Client.WithCallerComponent(component))
}

0 comments on commit aa7301d

Please sign in to comment.