Skip to content

Commit

Permalink
Merge branch 'master' into grpc-1-69-x
Browse files Browse the repository at this point in the history
  • Loading branch information
iosmanthus authored Mar 7, 2025
2 parents 2b3a8a6 + 3a625aa commit 5db0e63
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 50 deletions.
12 changes: 10 additions & 2 deletions internal/locate/pd_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@ import (
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/router"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/caller"
)

var _ pd.Client = &CodecPDClient{}

const componentName = "codec-pd-client"

// CodecPDClient wraps a PD Client to decode the encoded keys in region meta.
type CodecPDClient struct {
pd.Client
Expand All @@ -57,7 +60,7 @@ type CodecPDClient struct {
// NewCodecPDClient creates a CodecPDClient in API v1.
func NewCodecPDClient(mode apicodec.Mode, client pd.Client) *CodecPDClient {
codec := apicodec.NewCodecV1(mode)
return &CodecPDClient{client, codec}
return &CodecPDClient{client.WithCallerComponent(componentName), codec}
}

// NewCodecPDClientWithKeyspace creates a CodecPDClient in API v2 with keyspace name.
Expand All @@ -71,7 +74,7 @@ func NewCodecPDClientWithKeyspace(mode apicodec.Mode, client pd.Client, keyspace
return nil, err
}

return &CodecPDClient{client, codec}, nil
return &CodecPDClient{client.WithCallerComponent(componentName), codec}, nil
}

// GetKeyspaceID attempts to retrieve keyspace ID corresponding to the given keyspace name from PD.
Expand Down Expand Up @@ -202,3 +205,8 @@ func (c *CodecPDClient) decodeRegionKeyInPlace(r *router.Region) error {
}
return err
}

// WithCallerComponent returns a new PD client with the specified caller component.
func (c *CodecPDClient) WithCallerComponent(component caller.Component) pd.Client {
return &CodecPDClient{c.Client.WithCallerComponent(component), c.codec}
}
2 changes: 1 addition & 1 deletion internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
}

c := &RegionCache{
pdClient: pdClient,
pdClient: pdClient.WithCallerComponent("region-cache"),
requestHealthFeedbackCallback: options.requestHealthFeedbackCallback,
}

Expand Down
12 changes: 7 additions & 5 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,15 +873,14 @@ func (s *sendReqState) next(
defer s.releaseStoreToken(s.vars.rpcCtx.Store)
}

sendCtx := s.send(bo, req, timeout)
canceled := s.send(bo, req, timeout)
s.vars.sendTimes++

if s.vars.err != nil {
// Because in rpc logic, context.Cancel() will be transferred to rpcContext.Cancel error. For rpcContext cancel,
// we need to retry the request. But for context cancel active, for example, limitExec gets the required rows,
// we shouldn't retry the request, it will go to backoff and hang in retry logic.
if sendCtx.Err() != nil && errors.Cause(sendCtx.Err()) == context.Canceled {
metrics.TiKVRPCErrorCounter.WithLabelValues("context-canceled", storeIDLabel(s.vars.rpcCtx)).Inc()
if canceled {
return true
}
if val, e := util.EvalFailpoint("noRetryOnRpcError"); e == nil && val.(bool) {
Expand Down Expand Up @@ -912,9 +911,9 @@ func (s *sendReqState) next(
return true
}

func (s *sendReqState) send(bo *retry.Backoffer, req *tikvrpc.Request, timeout time.Duration) (ctx context.Context) {
func (s *sendReqState) send(bo *retry.Backoffer, req *tikvrpc.Request, timeout time.Duration) (canceled bool) {
rpcCtx := s.vars.rpcCtx
ctx = bo.GetCtx()
ctx := bo.GetCtx()
if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil {
var cancel context.CancelFunc
ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx)
Expand Down Expand Up @@ -1057,6 +1056,9 @@ func (s *sendReqState) send(bo *retry.Backoffer, req *tikvrpc.Request, timeout t
s.Stats.RecordRPCErrorStats(errStr)
s.recordRPCAccessInfo(req, s.vars.rpcCtx, errStr)
}
if canceled = ctx.Err() != nil && errors.Cause(ctx.Err()) == context.Canceled; canceled {
metrics.TiKVRPCErrorCounter.WithLabelValues("context-canceled", storeIDLabel(s.vars.rpcCtx)).Inc()
}
}
return
}
Expand Down
2 changes: 1 addition & 1 deletion internal/locate/store_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type storeCache interface {
}

func newStoreCache(pdClient pd.Client) *storeCacheImpl {
c := &storeCacheImpl{pdClient: pdClient}
c := &storeCacheImpl{pdClient: pdClient.WithCallerComponent("store-cache")}
c.notifyCheckCh = make(chan struct{}, 1)
c.storeMu.stores = make(map[uint64]*Store)
c.tiflashComputeStoreMu.needReload = true
Expand Down
89 changes: 62 additions & 27 deletions oracle/oracles/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/tso"
"go.uber.org/zap"
Expand Down Expand Up @@ -179,7 +180,7 @@ func NewPdOracle(pdClient pd.Client, options *PDOracleOptions) (oracle.Oracle, e
}

o := &pdOracle{
c: pdClient,
c: pdClient.WithCallerComponent("oracle"),
quit: make(chan struct{}),
lastTSUpdateInterval: atomic.Int64{},
}
Expand Down Expand Up @@ -647,6 +648,7 @@ func (o *pdOracle) getCurrentTSForValidation(ctx context.Context, opt *oracle.Op
// waiting for reusing the same result should not be canceled. So pass context.Background() instead of the
// current ctx.
res, err := o.GetTimestamp(context.Background(), opt)
_, _ = util.EvalFailpoint("getCurrentTSForValidationBeforeReturn")
return res, err
})
select {
Expand All @@ -660,42 +662,75 @@ func (o *pdOracle) getCurrentTSForValidation(ctx context.Context, opt *oracle.Op
}
}

func (o *pdOracle) ValidateReadTS(ctx context.Context, readTS uint64, isStaleRead bool, opt *oracle.Option) (errRet error) {
func (o *pdOracle) ValidateReadTS(ctx context.Context, readTS uint64, isStaleRead bool, opt *oracle.Option) error {
if readTS == math.MaxUint64 {
if isStaleRead {
return oracle.ErrLatestStaleRead{}
}
return nil
}

latestTSInfo, exists := o.getLastTSWithArrivalTS(opt.TxnScope)
// If we fail to get latestTSInfo or the readTS exceeds it, get a timestamp from PD to double-check.
// But we don't need to strictly fetch the latest TS. So if there are already concurrent calls to this function
// loading the latest TS, we can just reuse the same result to avoid too many concurrent GetTS calls.
if !exists || readTS > latestTSInfo.tso {
currentTS, err := o.getCurrentTSForValidation(ctx, opt)
if err != nil {
return errors.Errorf("fail to validate read timestamp: %v", err)
}
if isStaleRead {
o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(readTS, currentTS, time.Now())
}
if readTS > currentTS {
return oracle.ErrFutureTSRead{
ReadTS: readTS,
CurrentTS: currentTS,
retrying := false
for {
latestTSInfo, exists := o.getLastTSWithArrivalTS(opt.TxnScope)
// If we fail to get latestTSInfo or the readTS exceeds it, get a timestamp from PD to double-check.
// But we don't need to strictly fetch the latest TS. So if there are already concurrent calls to this function
// loading the latest TS, we can just reuse the same result to avoid too many concurrent GetTS calls.
if !exists || readTS > latestTSInfo.tso {
currentTS, err := o.getCurrentTSForValidation(ctx, opt)
if err != nil {
return errors.Errorf("fail to validate read timestamp: %v", err)
}
if isStaleRead && !retrying {
// Trigger the adjustment at most once in a single invocation.
o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(readTS, currentTS, time.Now())
}
if readTS > currentTS {
// It's possible that the caller is checking a ts that's legal but not fetched from the current oracle
// object. In this case, it's possible that:
// * The ts is not be cached by the low resolution ts (so that readTS > latestTSInfo.TSO);
// * ... and then the getCurrentTSForValidation (which uses a singleflight internally) reuse a
// previously-started call and returns an older ts
// so that it may cause the check false-positive.
// To handle this case, we do not fail immediately when the check doesn't at once; instead, retry one
// more time. In the retry:
// * Considering that there can already be some other concurrent GetTimestamp operation that may have updated
// the low resolution ts, so check it again. If it passes, then no need to get the next ts from PD,
// which is slow.
// * Then, call getCurrentTSForValidation and check again. As the current GetTimestamp operation
// inside getCurrentTSForValidation must be started after finishing the previous one (while the
// latter is finished after starting this invocation to ValidateReadTS), then we can conclude that
// the next ts returned by getCurrentTSForValidation must be greater than any ts allocated by PD
// before the current invocation to ValidateReadTS.
skipRetry := false
if val, err1 := util.EvalFailpoint("validateReadTSRetryGetTS"); err1 == nil {
if str, ok := val.(string); ok {
if str == "skip" {
skipRetry = true
}
}
}
if !retrying && !skipRetry {
retrying = true
continue
}
return oracle.ErrFutureTSRead{
ReadTS: readTS,
CurrentTS: currentTS,
}
}
} else if !retrying && isStaleRead {
// Trigger the adjustment at most once in a single invocation.
estimatedCurrentTS, err := o.getStaleTimestampWithLastTS(latestTSInfo, 0)
if err != nil {
logutil.Logger(ctx).Warn("failed to estimate current ts by getSlateTimestamp for auto-adjusting update low resolution ts interval",
zap.Error(err), zap.Uint64("readTS", readTS), zap.String("txnScope", opt.TxnScope))
} else {
o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(readTS, estimatedCurrentTS, time.Now())
}
}
} else if isStaleRead {
estimatedCurrentTS, err := o.getStaleTimestampWithLastTS(latestTSInfo, 0)
if err != nil {
logutil.Logger(ctx).Warn("failed to estimate current ts by getSlateTimestamp for auto-adjusting update low resolution ts interval",
zap.Error(err), zap.Uint64("readTS", readTS), zap.String("txnScope", opt.TxnScope))
} else {
o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(readTS, estimatedCurrentTS, time.Now())
}
return nil
}
return nil
}

// adjustUpdateLowResolutionTSIntervalWithRequestedStaleness triggers adjustments the update interval of low resolution
Expand Down
Loading

0 comments on commit 5db0e63

Please sign in to comment.