Skip to content

Commit

Permalink
Address comments; add more comments to explain checkMonotonicity
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta committed Sep 25, 2024
1 parent 4a795fe commit 0bfe55f
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
14 changes: 13 additions & 1 deletion client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type tsoInfo struct {
respReceivedAt time.Time
physical int64
logical int64
sourceStreamID string
}

type tsoServiceProvider interface {
Expand Down Expand Up @@ -589,6 +590,7 @@ func (td *tsoDispatcher) processRequests(
respReceivedAt: time.Now(),
physical: result.physical,
logical: result.logical,
sourceStreamID: stream.streamID,
}
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
firstLogical := tsoutil.AddLogical(result.logical, -int64(result.count)+1, result.suffixBits)
Expand Down Expand Up @@ -619,6 +621,14 @@ func (td *tsoDispatcher) doneCollectedRequests(tbc *tsoBatchController, physical
tbc.finishCollectedRequests(physical, firstLogical, suffixBits, streamID, nil)
}

// checkMonotonicity checks whether the monotonicity of the TSO allocation is violated.
// It asserts (curTSOInfo, firstLogical) must be larger than lastTSOInfo, and updates td.latestTSOInfo if it grows.
//
// Note that when concurrent RPC is enabled, the lastTSOInfo may not be the latest value stored in td.latestTSOInfo
// field. Instead, it's the value that was loaded just before the current RPC request's beginning. The reason is,
// if two requests processing time has overlap, they don't have a strong order, and the later-finished one may be
// allocated later (with larger value) than another. We only need to guarantee request A returns larger ts than B
// if request A *starts* after request B *finishes*.
func (td *tsoDispatcher) checkMonotonicity(
lastTSOInfo *tsoInfo, curTSOInfo *tsoInfo, firstLogical int64,
) {
Expand Down Expand Up @@ -647,7 +657,9 @@ func (td *tsoDispatcher) checkMonotonicity(
zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID),
zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt),
zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt))
zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt),
zap.String("last-stream-id", lastTSOInfo.sourceStreamID),
zap.String("cur-stream-id", curTSOInfo.sourceStreamID))
}
}

Expand Down
7 changes: 4 additions & 3 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,10 @@ const (

var streamIDAlloc atomic.Int32

const invalidStreamID = "<invalid>"

const maxPendingRequestsInTSOStream = 64
const (
invalidStreamID = "<invalid>"
maxPendingRequestsInTSOStream = 64
)

func newTSOStream(ctx context.Context, serverURL string, stream grpcTSOStreamAdapter) *tsoStream {
streamID := fmt.Sprintf("%s-%d", serverURL, streamIDAlloc.Add(1))
Expand Down

0 comments on commit 0bfe55f

Please sign in to comment.