Skip to content

Commit

Permalink
rac2,tracker: use CircularBuffer for tracker.Inflights
Browse files Browse the repository at this point in the history
The custom circular/ring buffer implementation in Raft's
tracker.Inflights does not shrink and unnecessarily grows to the maximum
configured size (unless the buffer becomes empty). With RAC2, the
maximum size is no longer a maximum, in lazy replication mode. There was
a bug in incorrectly trying to impose this maximum, which motivated this
slightly bigger cleanup. In general, it is beneficial for the inflight
buffer to also shrink, and by using the existing CircularBuffer, we can
simplify the code in tracker.Inflights.

As part of this change rac2.CircularBuffer is moved to util/container/ring
and becomes ring.Buffer. There is another ring.Buffer in util/ring, which
doesn't shrink and is not optimized to use bit arithmetic -- there is a
todo added there to replace it with the implementation in
util/container/ring.

Fixes #135223

Epic: none

Release note: None
  • Loading branch information
sumeerbhola committed Nov 15, 2024
1 parent 89df825 commit e5dca03
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 161 deletions.
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "rac2",
srcs = [
"circular_buffer.go",
"log_tracker.go",
"metrics.go",
"priority.go",
Expand All @@ -28,6 +27,7 @@ go_library(
"//pkg/settings/cluster",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/container/ring",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/log",
Expand All @@ -45,7 +45,6 @@ go_library(
go_test(
name = "rac2_test",
srcs = [
"circular_buffer_test.go",
"log_tracker_test.go",
"priority_test.go",
"range_controller_test.go",
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/container/ring"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
Expand Down Expand Up @@ -109,7 +110,7 @@ type LogTracker struct {
// - waiting[pri][i].Term <= last.Term
// - waiting[pri][i].Index < waiting[pri][i+1].Index
// - waiting[pri][i].Term <= waiting[pri][i+1].Term
waiting [raftpb.NumPriorities]CircularBuffer[LogMark]
waiting [raftpb.NumPriorities]ring.Buffer[LogMark]
}

// NewLogTracker returns a LogTracker initialized to the given log mark. The
Expand Down Expand Up @@ -354,7 +355,7 @@ func (l *LogTracker) errorf(ctx context.Context, format string, args ...any) {

// truncate updates the slice to be a prefix of the ordered log marks slice,
// with all marks at index > after removed from it.
func truncate(marks *CircularBuffer[LogMark], after uint64) {
func truncate(marks *ring.Buffer[LogMark], after uint64) {
n := marks.Length()
for i := n; i > 0; i-- {
if marks.At(i-1).Index <= after {
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/util/container/ring"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

Expand All @@ -31,7 +32,7 @@ type Tracker struct {
term uint64
// tracked contains the per-priority tracked log entries ordered by log index.
// All the tracked entries are in the term's leader log.
tracked [raftpb.NumPriorities]CircularBuffer[tracked]
tracked [raftpb.NumPriorities]ring.Buffer[tracked]

stream kvflowcontrol.Stream // used for logging only
}
Expand All @@ -45,7 +46,7 @@ type tracked struct {
func (t *Tracker) Init(term uint64, stream kvflowcontrol.Stream) {
*t = Tracker{
term: term,
tracked: [raftpb.NumPriorities]CircularBuffer[tracked]{},
tracked: [raftpb.NumPriorities]ring.Buffer[tracked]{},
stream: stream,
}
}
Expand Down Expand Up @@ -136,7 +137,7 @@ func (t *Tracker) UntrackAll() (returned [raftpb.NumPriorities]kvflowcontrol.Tok
returned[pri] += t.tracked[pri].At(i).tokens
}
}
t.tracked = [raftpb.NumPriorities]CircularBuffer[tracked]{}
t.tracked = [raftpb.NumPriorities]ring.Buffer[tracked]{}
return returned
}

Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/roachpb",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/container/ring",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/stop",
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
package replica_rac2

import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/util/container/ring"
)

// lowPriOverrideState records which raft log entries have their priority
Expand Down Expand Up @@ -69,7 +69,7 @@ type lowPriOverrideState struct {
//
// A call to getEffectivePriority for index i causes a prefix of indices <=
// i to be discarded.
intervals rac2.CircularBuffer[interval]
intervals ring.Buffer[interval]
// Highest term observed so far.
leaderTerm uint64
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/raft/tracker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//pkg/raft/raftlogger",
"//pkg/raft/raftpb",
"//pkg/raft/raftstoreliveness",
"//pkg/util/container/ring",
"//pkg/util/hlc",
"//pkg/util/syncutil",
"@com_github_cockroachdb_redact//:redact",
Expand All @@ -37,6 +38,7 @@ go_test(
"//pkg/raft/raftlogger",
"//pkg/raft/raftpb",
"//pkg/raft/raftstoreliveness",
"//pkg/util/container/ring",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
68 changes: 15 additions & 53 deletions pkg/raft/tracker/inflights.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package tracker

import "github.com/cockroachdb/cockroach/pkg/util/container/ring"

// inflight describes an in-flight MsgApp message.
type inflight struct {
index uint64 // the index of the last entry inside the message
Expand All @@ -29,10 +31,6 @@ type inflight struct {
// they are sending a new append, and release "quota" via FreeLE() whenever an
// ack is received.
type Inflights struct {
// the starting index in the buffer
start int

count int // number of inflight messages in the buffer
bytes uint64 // number of inflight bytes

// TODO(pav-kv): do not store the limits here, pass them to methods. For flow
Expand All @@ -41,7 +39,7 @@ type Inflights struct {
maxBytes uint64 // the max total byte size of inflight messages

// buffer is a ring buffer containing info about all in-flight messages.
buffer []inflight
buffer ring.Buffer[inflight]
}

// NewInflights sets up an Inflights that allows up to size inflight messages,
Expand All @@ -59,7 +57,7 @@ func NewInflights(size int, maxBytes uint64) *Inflights {
// the receiver.
func (in *Inflights) Clone() *Inflights {
ins := *in
ins.buffer = append([]inflight(nil), in.buffer...)
ins.buffer = in.buffer.Clone()
return &ins
}

Expand All @@ -73,78 +71,42 @@ func (in *Inflights) Clone() *Inflights {
// is implemented at the higher app level. The tracker correctly tracks all the
// in-flight entries.
func (in *Inflights) Add(index, bytes uint64) {
next := in.start + in.count
size := in.size
if next >= size {
next -= size
}
if next >= len(in.buffer) {
in.grow()
}
in.buffer[next] = inflight{index: index, bytes: bytes}
in.count++
in.buffer.Push(inflight{index: index, bytes: bytes})
in.bytes += bytes
}

// grow the inflight buffer by doubling up to inflights.size. We grow on demand
// instead of preallocating to inflights.size to handle systems which have
// thousands of Raft groups per process.
func (in *Inflights) grow() {
newSize := len(in.buffer) * 2
if newSize == 0 {
newSize = 1
} else if newSize > in.size {
newSize = in.size
}
newBuffer := make([]inflight, newSize)
copy(newBuffer, in.buffer)
in.buffer = newBuffer
}

// FreeLE frees the inflights smaller or equal to the given `to` flight.
func (in *Inflights) FreeLE(to uint64) {
if in.count == 0 || to < in.buffer[in.start].index {
n := in.buffer.Length()
if n == 0 || to < in.buffer.At(0).index {
// out of the left side of the window
return
}

idx := in.start
var i int
var bytes uint64
for i = 0; i < in.count; i++ {
if to < in.buffer[idx].index { // found the first large inflight
for i = 0; i < n; i++ {
e := in.buffer.At(i)
if to < e.index { // found the first large inflight
break
}
bytes += in.buffer[idx].bytes

// increase index and maybe rotate
size := in.size
if idx++; idx >= size {
idx -= size
}
bytes += e.bytes
}
// free i inflights and set new start index
in.count -= i
in.buffer.Pop(i)
in.bytes -= bytes
in.start = idx
if in.count == 0 {
// inflights is empty, reset the start index so that we don't grow the
// buffer unnecessarily.
in.start = 0
}
}

// Full returns true if no more messages can be sent at the moment.
func (in *Inflights) Full() bool {
return in.count >= in.size || (in.maxBytes != 0 && in.bytes >= in.maxBytes)
return in.buffer.Length() >= in.size || (in.maxBytes != 0 && in.bytes >= in.maxBytes)
}

// Count returns the number of inflight messages.
func (in *Inflights) Count() int { return in.count }
func (in *Inflights) Count() int { return in.buffer.Length() }

// reset frees all inflights.
func (in *Inflights) reset() {
in.start = 0
in.count = 0
in.buffer.ShrinkToPrefix(0)
in.bytes = 0
}
Loading

0 comments on commit e5dca03

Please sign in to comment.