Skip to content

Commit

Permalink
Adapt the monotonicity check for parallel RPC
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta committed Sep 23, 2024
1 parent e5f9355 commit 1b022ce
Showing 1 changed file with 34 additions and 18 deletions.
52 changes: 34 additions & 18 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"math/rand"
"runtime/trace"
"sync"
"sync/atomic"
"time"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -81,7 +82,7 @@ type tsoDispatcher struct {
connectionCtxs *sync.Map
tsoRequestCh chan *tsoRequest
tsDeadlineCh chan *deadline
lastTSOInfo *tsoInfo
latestTSOInfo atomic.Pointer[tsoInfo]
// For reusing tsoBatchController objects
batchBufferPool *sync.Pool

Expand Down Expand Up @@ -549,6 +550,9 @@ func (td *tsoDispatcher) processRequests(
reqKeyspaceGroupID = svcDiscovery.GetKeyspaceGroupID()
)

// Load latest allocated ts for monotonicity assertion.
tsoInfoBeforeReq := td.latestTSOInfo.Load()

cb := func(result tsoRequestResult, reqKeyspaceGroupID uint32, err error) {
// As golang doesn't allow double-closing a channel, here is implicitly a check that the callback
// is never called twice or called while it's also being cancelled elsewhere.
Expand All @@ -571,7 +575,7 @@ func (td *tsoDispatcher) processRequests(
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
firstLogical := tsoutil.AddLogical(result.logical, -int64(result.count)+1, result.suffixBits)
// Do the check before releasing the token.
td.compareAndSwapTS(curTSOInfo, firstLogical)
td.checkMonotonicity(tsoInfoBeforeReq, curTSOInfo, firstLogical)
td.doneCollectedRequests(tbc, result.physical, firstLogical, result.suffixBits, stream.streamID)
}

Expand All @@ -597,32 +601,27 @@ func (td *tsoDispatcher) doneCollectedRequests(tbc *tsoBatchController, physical
tbc.finishCollectedRequests(physical, firstLogical, suffixBits, streamID, nil)
}

func (td *tsoDispatcher) compareAndSwapTS(
curTSOInfo *tsoInfo, firstLogical int64,
func (td *tsoDispatcher) checkMonotonicity(
lastTSOInfo *tsoInfo, curTSOInfo *tsoInfo, firstLogical int64,
) {
if td.lastTSOInfo != nil {
var (
lastTSOInfo = td.lastTSOInfo
dc = td.dc
physical = curTSOInfo.physical
keyspaceID = td.provider.getServiceDiscovery().GetKeyspaceID()
)
if td.lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID {
keyspaceID := td.provider.getServiceDiscovery().GetKeyspaceID()
if lastTSOInfo != nil {
if lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID {
log.Info("[tso] keyspace group changed",
zap.String("dc-location", dc),
zap.String("dc-location", td.dc),
zap.Uint32("old-group-id", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("new-group-id", curTSOInfo.respKeyspaceGroupID))
}
// The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical
// to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then
// all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned
// all TSOs we get will be [6, 7, 8, 9, 10]. latestTSOInfo.logical stores the logical part of the largest ts returned
// last time.
if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) {
if tsoutil.TSLessEqual(curTSOInfo.physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) {
log.Panic("[tso] timestamp fallback",
zap.String("dc-location", dc),
zap.String("dc-location", td.dc),
zap.Uint32("keyspace", keyspaceID),
zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)),
zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)),
zap.String("cur-ts", fmt.Sprintf("(%d, %d)", curTSOInfo.physical, firstLogical)),
zap.String("last-tso-server", lastTSOInfo.tsoServer),
zap.String("cur-tso-server", curTSOInfo.tsoServer),
zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID),
Expand All @@ -633,7 +632,24 @@ func (td *tsoDispatcher) compareAndSwapTS(
zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt))
}
}
td.lastTSOInfo = curTSOInfo

if td.latestTSOInfo.CompareAndSwap(nil, curTSOInfo) {
// If latestTSOInfo is missing, simply store it and exit.
return
}

// Replace if we are holding a larger ts than that has been recorded.
for {
old := td.latestTSOInfo.Load()
if tsoutil.TSLessEqual(curTSOInfo.physical, curTSOInfo.logical, old.physical, old.logical) {
// The current one is large enough. Skip.
break
}
if td.latestTSOInfo.CompareAndSwap(old, curTSOInfo) {
// Successfully replaced.
break
}
}
}

// checkTSORPCConcurrency checks configurations about TSO RPC concurrency, and adjust the token count if needed.
Expand Down

0 comments on commit 1b022ce

Please sign in to comment.