diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 2fada15809e..43049476d29 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -20,6 +20,7 @@ import ( "math/rand" "runtime/trace" "sync" + "sync/atomic" "time" "github.com/opentracing/opentracing-go" @@ -81,7 +82,7 @@ type tsoDispatcher struct { connectionCtxs *sync.Map tsoRequestCh chan *tsoRequest tsDeadlineCh chan *deadline - lastTSOInfo *tsoInfo + latestTSOInfo atomic.Pointer[tsoInfo] // For reusing tsoBatchController objects batchBufferPool *sync.Pool @@ -549,6 +550,9 @@ func (td *tsoDispatcher) processRequests( reqKeyspaceGroupID = svcDiscovery.GetKeyspaceGroupID() ) + // Load latest allocated ts for monotonicity assertion. + tsoInfoBeforeReq := td.latestTSOInfo.Load() + cb := func(result tsoRequestResult, reqKeyspaceGroupID uint32, err error) { // As golang doesn't allow double-closing a channel, here is implicitly a check that the callback // is never called twice or called while it's also being cancelled elsewhere. @@ -571,7 +575,7 @@ func (td *tsoDispatcher) processRequests( // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. firstLogical := tsoutil.AddLogical(result.logical, -int64(result.count)+1, result.suffixBits) // Do the check before releasing the token. - td.compareAndSwapTS(curTSOInfo, firstLogical) + td.checkMonotonicity(tsoInfoBeforeReq, curTSOInfo, firstLogical) td.doneCollectedRequests(tbc, result.physical, firstLogical, result.suffixBits, stream.streamID) } @@ -597,32 +601,27 @@ func (td *tsoDispatcher) doneCollectedRequests(tbc *tsoBatchController, physical tbc.finishCollectedRequests(physical, firstLogical, suffixBits, streamID, nil) } -func (td *tsoDispatcher) compareAndSwapTS( - curTSOInfo *tsoInfo, firstLogical int64, +func (td *tsoDispatcher) checkMonotonicity( + lastTSOInfo *tsoInfo, curTSOInfo *tsoInfo, firstLogical int64, ) { - if td.lastTSOInfo != nil { - var ( - lastTSOInfo = td.lastTSOInfo - dc = td.dc - physical = curTSOInfo.physical - keyspaceID = td.provider.getServiceDiscovery().GetKeyspaceID() - ) - if td.lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID { + keyspaceID := td.provider.getServiceDiscovery().GetKeyspaceID() + if lastTSOInfo != nil { + if lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID { log.Info("[tso] keyspace group changed", - zap.String("dc-location", dc), + zap.String("dc-location", td.dc), zap.Uint32("old-group-id", lastTSOInfo.respKeyspaceGroupID), zap.Uint32("new-group-id", curTSOInfo.respKeyspaceGroupID)) } // The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical // to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then - // all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned + // all TSOs we get will be [6, 7, 8, 9, 10]. latestTSOInfo.logical stores the logical part of the largest ts returned // last time. - if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) { + if tsoutil.TSLessEqual(curTSOInfo.physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) { log.Panic("[tso] timestamp fallback", - zap.String("dc-location", dc), + zap.String("dc-location", td.dc), zap.Uint32("keyspace", keyspaceID), zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)), - zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)), + zap.String("cur-ts", fmt.Sprintf("(%d, %d)", curTSOInfo.physical, firstLogical)), zap.String("last-tso-server", lastTSOInfo.tsoServer), zap.String("cur-tso-server", curTSOInfo.tsoServer), zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID), @@ -633,7 +632,24 @@ func (td *tsoDispatcher) compareAndSwapTS( zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt)) } } - td.lastTSOInfo = curTSOInfo + + if td.latestTSOInfo.CompareAndSwap(nil, curTSOInfo) { + // If latestTSOInfo is missing, simply store it and exit. + return + } + + // Replace if we are holding a larger ts than that has been recorded. + for { + old := td.latestTSOInfo.Load() + if tsoutil.TSLessEqual(curTSOInfo.physical, curTSOInfo.logical, old.physical, old.logical) { + // The current one is large enough. Skip. + break + } + if td.latestTSOInfo.CompareAndSwap(old, curTSOInfo) { + // Successfully replaced. + break + } + } } // checkTSORPCConcurrency checks configurations about TSO RPC concurrency, and adjust the token count if needed.