Skip to content

Commit

Permalink
Async tracking of commit offset i QuorumAckTracker (#510)
Browse files Browse the repository at this point in the history
Track the quorum offset using a callback. This will free the goroutine
handling the write stream for processing the next write batch.
  • Loading branch information
merlimat authored Aug 30, 2024
1 parent 608316a commit 407ea2e
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 31 deletions.
25 changes: 13 additions & 12 deletions server/leader_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,21 +892,22 @@ func (lc *leaderController) handleWalSynced(stream proto.OxiaClient_WriteStreamS
return
}

resp, err2 := lc.quorumAckTracker.WaitForCommitOffset(stream.Context(), offset, func() (*proto.WriteResponse, error) {
lc.quorumAckTracker.WaitForCommitOffsetAsync(offset, func() (*proto.WriteResponse, error) {
return lc.db.ProcessWrite(req, offset, timestamp, SessionUpdateOperationCallback)
})
if err2 != nil {
timer.Done()
closeCh <- err2
return
}
}, func(response *proto.WriteResponse, err error) {
if err != nil {
timer.Done()
closeCh <- err
return
}

if err3 := stream.Send(resp); err3 != nil {
if err = stream.Send(response); err != nil {
timer.Done()
closeCh <- err
return
}
timer.Done()
closeCh <- err3
return
}
timer.Done()
})
}

func (lc *leaderController) appendToWalStreamRequest(request *proto.WriteRequest,
Expand Down
84 changes: 66 additions & 18 deletions server/quorum_ack_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type QuorumAckTracker interface {
// After that, invokes the function f
WaitForCommitOffset(ctx context.Context, offset int64, f func() (*proto.WriteResponse, error)) (*proto.WriteResponse, error)

WaitForCommitOffsetAsync(offset int64, f func() (*proto.WriteResponse, error), callback func(*proto.WriteResponse, error))

// NextOffset returns the offset for the next entry to write
// Note this can go ahead of the head-offset as there can be multiple operations in flight.
NextOffset() int64
Expand All @@ -67,8 +69,8 @@ type QuorumAckTracker interface {

type quorumAckTracker struct {
sync.Mutex
waitForHeadOffset common.ConditionContext
waitForCommitOffset common.ConditionContext
waitingRequests []waitingRequest
waitForHeadOffset common.ConditionContext

replicationFactor uint32
requiredAcks uint32
Expand All @@ -93,13 +95,19 @@ type cursorAcker struct {
cursorIdx int
}

type waitingRequest struct {
minOffset int64
callback func()
}

func NewQuorumAckTracker(replicationFactor uint32, headOffset int64, commitOffset int64) QuorumAckTracker {
q := &quorumAckTracker{
// Ack quorum is number of follower acks that are required to consider the entry fully committed
// We are using RF/2 (and not RF/2 + 1) because the leader is already storing 1 copy locally
requiredAcks: replicationFactor / 2,
replicationFactor: replicationFactor,
tracker: make(map[int64]*util.BitSet),
waitingRequests: make([]waitingRequest, 0),
}

q.nextOffset.Store(headOffset)
Expand All @@ -112,7 +120,6 @@ func NewQuorumAckTracker(replicationFactor uint32, headOffset int64, commitOffse
}

q.waitForHeadOffset = common.NewConditionContext(q)
q.waitForCommitOffset = common.NewConditionContext(q)
return q
}

Expand All @@ -128,8 +135,7 @@ func (q *quorumAckTracker) AdvanceHeadOffset(headOffset int64) {
q.waitForHeadOffset.Broadcast()

if q.requiredAcks == 0 {
q.commitOffset.Store(headOffset)
q.waitForCommitOffset.Broadcast()
q.notifyCommitOffsetAdvanced(headOffset)
} else {
q.tracker[headOffset] = &util.BitSet{}
}
Expand Down Expand Up @@ -161,32 +167,75 @@ func (q *quorumAckTracker) WaitForHeadOffset(ctx context.Context, offset int64)
}

func (q *quorumAckTracker) WaitForCommitOffset(ctx context.Context, offset int64, f func() (*proto.WriteResponse, error)) (*proto.WriteResponse, error) {
ch := make(chan struct {
*proto.WriteResponse
error
}, 1)
q.WaitForCommitOffsetAsync(offset, f, func(response *proto.WriteResponse, err error) {
ch <- struct {
*proto.WriteResponse
error
}{response, err}
})

select {
case s := <-ch:
return s.WriteResponse, s.error
case <-ctx.Done():
return nil, ctx.Err()
}
}

func (q *quorumAckTracker) WaitForCommitOffsetAsync(offset int64, f func() (*proto.WriteResponse, error),
callback func(*proto.WriteResponse, error)) {
q.Lock()
defer q.Unlock()

for !q.closed && q.requiredAcks > 0 && q.commitOffset.Load() < offset {
if err := q.waitForCommitOffset.Wait(ctx); err != nil {
return nil, err
}
}

if q.closed {
return nil, common.ErrorAlreadyClosed
callback(nil, common.ErrorAlreadyClosed)
return
}

if f != nil {
return f()
if q.requiredAcks == 0 || q.commitOffset.Load() >= offset {
var res *proto.WriteResponse
var err error
if f != nil {
res, err = f()
}

callback(res, err)
return
}

return nil, nil //nolint:nilnil
q.waitingRequests = append(q.waitingRequests, waitingRequest{offset, func() {
var res *proto.WriteResponse
var err error
if f != nil {
res, err = f()
}

callback(res, err)
}})
}

func (q *quorumAckTracker) notifyCommitOffsetAdvanced(commitOffset int64) {
q.commitOffset.Store(commitOffset)

for _, r := range q.waitingRequests {
if r.minOffset > commitOffset {
return
}

q.waitingRequests = q.waitingRequests[1:]
r.callback()
}
}

func (q *quorumAckTracker) Close() error {
q.Lock()
defer q.Unlock()

q.closed = true
q.waitForCommitOffset.Broadcast()
q.waitForHeadOffset.Broadcast()
return nil
}
Expand Down Expand Up @@ -241,7 +290,6 @@ func (c *cursorAcker) ack(offset int64) {
delete(q.tracker, offset)

// Advance the commit offset
q.commitOffset.Store(offset)
q.waitForCommitOffset.Broadcast()
q.notifyCommitOffsetAdvanced(offset)
}
}
6 changes: 5 additions & 1 deletion server/wal/wal_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,12 @@ func (t *wal) drainSyncRequestsChannel(callbacks []func(error)) []func(error) {
}

func (t *wal) runSync() {
var callbacks []func(error)

for {
var callbacks []func(error)
// Clear the slice
callbacks = callbacks[:0]

select {
case <-t.ctx.Done():
// Wal is closing, exit the go routine
Expand Down

0 comments on commit 407ea2e

Please sign in to comment.