diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel index 3cc9def99165..2c78422bb6ec 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel @@ -3,7 +3,6 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "rac2", srcs = [ - "circular_buffer.go", "log_tracker.go", "metrics.go", "priority.go", @@ -28,6 +27,7 @@ go_library( "//pkg/settings/cluster", "//pkg/util/admission/admissionpb", "//pkg/util/buildutil", + "//pkg/util/container/ring", "//pkg/util/hlc", "//pkg/util/humanizeutil", "//pkg/util/log", @@ -45,7 +45,6 @@ go_library( go_test( name = "rac2_test", srcs = [ - "circular_buffer_test.go", "log_tracker_test.go", "priority_test.go", "range_controller_test.go", diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go b/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go index f61bed4de202..40c11181b408 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go @@ -13,6 +13,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/container/ring" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" @@ -109,7 +110,7 @@ type LogTracker struct { // - waiting[pri][i].Term <= last.Term // - waiting[pri][i].Index < waiting[pri][i+1].Index // - waiting[pri][i].Term <= waiting[pri][i+1].Term - waiting [raftpb.NumPriorities]CircularBuffer[LogMark] + waiting [raftpb.NumPriorities]ring.Buffer[LogMark] } // NewLogTracker returns a LogTracker initialized to the given log mark. The @@ -354,7 +355,7 @@ func (l *LogTracker) errorf(ctx context.Context, format string, args ...any) { // truncate updates the slice to be a prefix of the ordered log marks slice, // with all marks at index > after removed from it. -func truncate(marks *CircularBuffer[LogMark], after uint64) { +func truncate(marks *ring.Buffer[LogMark], after uint64) { n := marks.Length() for i := n; i > 0; i-- { if marks.At(i-1).Index <= after { diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go b/pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go index 6697ca75f61e..96b5cae71316 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go @@ -12,6 +12,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/cockroach/pkg/util/container/ring" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -31,7 +32,7 @@ type Tracker struct { term uint64 // tracked contains the per-priority tracked log entries ordered by log index. // All the tracked entries are in the term's leader log. - tracked [raftpb.NumPriorities]CircularBuffer[tracked] + tracked [raftpb.NumPriorities]ring.Buffer[tracked] stream kvflowcontrol.Stream // used for logging only } @@ -45,7 +46,7 @@ type tracked struct { func (t *Tracker) Init(term uint64, stream kvflowcontrol.Stream) { *t = Tracker{ term: term, - tracked: [raftpb.NumPriorities]CircularBuffer[tracked]{}, + tracked: [raftpb.NumPriorities]ring.Buffer[tracked]{}, stream: stream, } } @@ -136,7 +137,7 @@ func (t *Tracker) UntrackAll() (returned [raftpb.NumPriorities]kvflowcontrol.Tok returned[pri] += t.tracked[pri].At(i).tokens } } - t.tracked = [raftpb.NumPriorities]CircularBuffer[tracked]{} + t.tracked = [raftpb.NumPriorities]ring.Buffer[tracked]{} return returned } diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel index c10e7bde5082..9892e7cbaec3 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "//pkg/roachpb", "//pkg/util/admission/admissionpb", "//pkg/util/buildutil", + "//pkg/util/container/ring", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/stop", diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/admission.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/admission.go index a14d248ceb9f..73b96c3e77a2 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/admission.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/admission.go @@ -6,8 +6,8 @@ package replica_rac2 import ( - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/cockroach/pkg/util/container/ring" ) // lowPriOverrideState records which raft log entries have their priority @@ -69,7 +69,7 @@ type lowPriOverrideState struct { // // A call to getEffectivePriority for index i causes a prefix of indices <= // i to be discarded. - intervals rac2.CircularBuffer[interval] + intervals ring.Buffer[interval] // Highest term observed so far. leaderTerm uint64 } diff --git a/pkg/raft/tracker/BUILD.bazel b/pkg/raft/tracker/BUILD.bazel index d7c8e6bbd70d..4608fc6f2633 100644 --- a/pkg/raft/tracker/BUILD.bazel +++ b/pkg/raft/tracker/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//pkg/raft/raftlogger", "//pkg/raft/raftpb", "//pkg/raft/raftstoreliveness", + "//pkg/util/container/ring", "//pkg/util/hlc", "//pkg/util/syncutil", "@com_github_cockroachdb_redact//:redact", @@ -37,6 +38,7 @@ go_test( "//pkg/raft/raftlogger", "//pkg/raft/raftpb", "//pkg/raft/raftstoreliveness", + "//pkg/util/container/ring", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/raft/tracker/inflights.go b/pkg/raft/tracker/inflights.go index b9bb090ed9ed..f2f38bedf247 100644 --- a/pkg/raft/tracker/inflights.go +++ b/pkg/raft/tracker/inflights.go @@ -17,6 +17,8 @@ package tracker +import "github.com/cockroachdb/cockroach/pkg/util/container/ring" + // inflight describes an in-flight MsgApp message. type inflight struct { index uint64 // the index of the last entry inside the message @@ -29,10 +31,6 @@ type inflight struct { // they are sending a new append, and release "quota" via FreeLE() whenever an // ack is received. type Inflights struct { - // the starting index in the buffer - start int - - count int // number of inflight messages in the buffer bytes uint64 // number of inflight bytes // TODO(pav-kv): do not store the limits here, pass them to methods. For flow @@ -41,7 +39,7 @@ type Inflights struct { maxBytes uint64 // the max total byte size of inflight messages // buffer is a ring buffer containing info about all in-flight messages. - buffer []inflight + buffer ring.Buffer[inflight] } // NewInflights sets up an Inflights that allows up to size inflight messages, @@ -59,7 +57,7 @@ func NewInflights(size int, maxBytes uint64) *Inflights { // the receiver. func (in *Inflights) Clone() *Inflights { ins := *in - ins.buffer = append([]inflight(nil), in.buffer...) + ins.buffer = in.buffer.Clone() return &ins } @@ -73,78 +71,42 @@ func (in *Inflights) Clone() *Inflights { // is implemented at the higher app level. The tracker correctly tracks all the // in-flight entries. func (in *Inflights) Add(index, bytes uint64) { - next := in.start + in.count - size := in.size - if next >= size { - next -= size - } - if next >= len(in.buffer) { - in.grow() - } - in.buffer[next] = inflight{index: index, bytes: bytes} - in.count++ + in.buffer.Push(inflight{index: index, bytes: bytes}) in.bytes += bytes } -// grow the inflight buffer by doubling up to inflights.size. We grow on demand -// instead of preallocating to inflights.size to handle systems which have -// thousands of Raft groups per process. -func (in *Inflights) grow() { - newSize := len(in.buffer) * 2 - if newSize == 0 { - newSize = 1 - } else if newSize > in.size { - newSize = in.size - } - newBuffer := make([]inflight, newSize) - copy(newBuffer, in.buffer) - in.buffer = newBuffer -} - // FreeLE frees the inflights smaller or equal to the given `to` flight. func (in *Inflights) FreeLE(to uint64) { - if in.count == 0 || to < in.buffer[in.start].index { + n := in.buffer.Length() + if n == 0 || to < in.buffer.At(0).index { // out of the left side of the window return } - idx := in.start var i int var bytes uint64 - for i = 0; i < in.count; i++ { - if to < in.buffer[idx].index { // found the first large inflight + for i = 0; i < n; i++ { + e := in.buffer.At(i) + if to < e.index { // found the first large inflight break } - bytes += in.buffer[idx].bytes - - // increase index and maybe rotate - size := in.size - if idx++; idx >= size { - idx -= size - } + bytes += e.bytes } // free i inflights and set new start index - in.count -= i + in.buffer.Pop(i) in.bytes -= bytes - in.start = idx - if in.count == 0 { - // inflights is empty, reset the start index so that we don't grow the - // buffer unnecessarily. - in.start = 0 - } } // Full returns true if no more messages can be sent at the moment. func (in *Inflights) Full() bool { - return in.count >= in.size || (in.maxBytes != 0 && in.bytes >= in.maxBytes) + return in.buffer.Length() >= in.size || (in.maxBytes != 0 && in.bytes >= in.maxBytes) } // Count returns the number of inflight messages. -func (in *Inflights) Count() int { return in.count } +func (in *Inflights) Count() int { return in.buffer.Length() } // reset frees all inflights. func (in *Inflights) reset() { - in.start = 0 - in.count = 0 + in.buffer.ShrinkToPrefix(0) in.bytes = 0 } diff --git a/pkg/raft/tracker/inflights_test.go b/pkg/raft/tracker/inflights_test.go index dafcd15d2a08..036b9971b2ab 100644 --- a/pkg/raft/tracker/inflights_test.go +++ b/pkg/raft/tracker/inflights_test.go @@ -20,14 +20,25 @@ package tracker import ( "testing" + "github.com/cockroachdb/cockroach/pkg/util/container/ring" "github.com/stretchr/testify/require" ) +func checkEquality(t *testing.T, expected Inflights, actual Inflights) { + expBuf := expected.buffer + actualBuf := actual.buffer + expected.buffer = ring.Buffer[inflight]{} + actual.buffer = ring.Buffer[inflight]{} + require.Equal(t, expected, actual) + require.Equal(t, expBuf.Length(), actualBuf.Length()) + for i := 0; i < expBuf.Length(); i++ { + require.Equal(t, expBuf.At(i), actualBuf.At(i)) + } +} + func TestInflightsAdd(t *testing.T) { - // no rotating case in := &Inflights{ - size: 10, - buffer: make([]inflight, 10), + size: 10, } for i := 0; i < 5; i++ { @@ -35,24 +46,20 @@ func TestInflightsAdd(t *testing.T) { } wantIn := &Inflights{ - start: 0, - count: 5, bytes: 510, size: 10, buffer: inflightsBuffer( // ↓------------ - []uint64{0, 1, 2, 3, 4, 0, 0, 0, 0, 0}, - []uint64{100, 101, 102, 103, 104, 0, 0, 0, 0, 0}), + []uint64{0, 1, 2, 3, 4}, + []uint64{100, 101, 102, 103, 104}), } - require.Equal(t, wantIn, in) + checkEquality(t, *wantIn, *in) for i := 5; i < 10; i++ { in.Add(uint64(i), uint64(100+i)) } wantIn2 := &Inflights{ - start: 0, - count: 10, bytes: 1045, size: 10, buffer: inflightsBuffer( @@ -60,50 +67,25 @@ func TestInflightsAdd(t *testing.T) { []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), } - require.Equal(t, wantIn2, in) - - // rotating case - in2 := &Inflights{ - start: 5, - size: 10, - buffer: make([]inflight, 10), - } - - for i := 0; i < 5; i++ { - in2.Add(uint64(i), uint64(100+i)) - } + checkEquality(t, *wantIn2, *in) - wantIn21 := &Inflights{ - start: 5, - count: 5, - bytes: 510, - size: 10, - buffer: inflightsBuffer( - // ↓------------ - []uint64{0, 0, 0, 0, 0, 0, 1, 2, 3, 4}, - []uint64{0, 0, 0, 0, 0, 100, 101, 102, 103, 104}), - } - require.Equal(t, wantIn21, in2) - - for i := 5; i < 10; i++ { - in2.Add(uint64(i), uint64(100+i)) + // Can grow beyond size. + for i := 10; i < 15; i++ { + in.Add(uint64(i), uint64(100+i)) } - wantIn22 := &Inflights{ - start: 5, - count: 10, - bytes: 1045, + wantIn3 := &Inflights{ + bytes: 1605, size: 10, buffer: inflightsBuffer( - // -------------- ↓------------ - []uint64{5, 6, 7, 8, 9, 0, 1, 2, 3, 4}, - []uint64{105, 106, 107, 108, 109, 100, 101, 102, 103, 104}), + // ↓--------------------------- + []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14}, + []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114}), } - require.Equal(t, wantIn22, in2) + checkEquality(t, *wantIn3, *in) } func TestInflightFreeTo(t *testing.T) { - // no rotating case in := NewInflights(10, 0) for i := 0; i < 10; i++ { in.Add(uint64(i), uint64(100+i)) @@ -112,46 +94,39 @@ func TestInflightFreeTo(t *testing.T) { in.FreeLE(0) wantIn0 := &Inflights{ - start: 1, - count: 9, bytes: 945, size: 10, buffer: inflightsBuffer( - // ↓------------------------ - []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, - []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), + // ↓------------------------ + []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9}, + []uint64{101, 102, 103, 104, 105, 106, 107, 108, 109}), } - require.Equal(t, wantIn0, in) + checkEquality(t, *wantIn0, *in) in.FreeLE(4) wantIn := &Inflights{ - start: 5, - count: 5, bytes: 535, size: 10, buffer: inflightsBuffer( - // ↓------------ - []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, - []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), + // ↓------------ + []uint64{5, 6, 7, 8, 9}, + []uint64{105, 106, 107, 108, 109}), } - require.Equal(t, wantIn, in) + checkEquality(t, *wantIn, *in) in.FreeLE(8) wantIn2 := &Inflights{ - start: 9, - count: 1, bytes: 109, size: 10, buffer: inflightsBuffer( // ↓ - []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, - []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), + []uint64{9}, + []uint64{109}), } - require.Equal(t, wantIn2, in) + checkEquality(t, *wantIn2, *in) - // rotating case for i := 10; i < 15; i++ { in.Add(uint64(i), uint64(100+i)) } @@ -159,29 +134,21 @@ func TestInflightFreeTo(t *testing.T) { in.FreeLE(12) wantIn3 := &Inflights{ - start: 3, - count: 2, bytes: 227, size: 10, buffer: inflightsBuffer( - // ↓----- - []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, - []uint64{110, 111, 112, 113, 114, 105, 106, 107, 108, 109}), + // ↓----- + []uint64{13, 14}, + []uint64{113, 114}), } - require.Equal(t, wantIn3, in) + checkEquality(t, *wantIn3, *in) in.FreeLE(14) wantIn4 := &Inflights{ - start: 0, - count: 0, - size: 10, - buffer: inflightsBuffer( - // ↓ - []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, - []uint64{110, 111, 112, 113, 114, 105, 106, 107, 108, 109}), + size: 10, } - require.Equal(t, wantIn4, in) + checkEquality(t, *wantIn4, *in) } func TestInflightsFull(t *testing.T) { @@ -244,13 +211,13 @@ func TestInflightsReset(t *testing.T) { require.Equal(t, 0, in.Count()) } -func inflightsBuffer(indices []uint64, sizes []uint64) []inflight { +func inflightsBuffer(indices []uint64, sizes []uint64) ring.Buffer[inflight] { if len(indices) != len(sizes) { panic("len(indices) != len(sizes)") } - buffer := make([]inflight, 0, len(indices)) + var buffer ring.Buffer[inflight] for i, idx := range indices { - buffer = append(buffer, inflight{index: idx, bytes: sizes[i]}) + buffer.Push(inflight{index: idx, bytes: sizes[i]}) } return buffer } diff --git a/pkg/util/container/ring/BUILD.bazel b/pkg/util/container/ring/BUILD.bazel index c60e80d197e9..6fbdfff10df9 100644 --- a/pkg/util/container/ring/BUILD.bazel +++ b/pkg/util/container/ring/BUILD.bazel @@ -2,16 +2,31 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "ring", - srcs = ["ring.go"], + srcs = [ + "buffer.go", + "ring.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/util/container/ring", visibility = ["//visibility:public"], + deps = [ + "//pkg/util/buildutil", + "@com_github_cockroachdb_errors//:errors", + ], ) go_test( name = "ring_test", srcs = [ + "buffer_test.go", "example_test.go", "ring_test.go", ], + data = glob(["testdata/**"]), embed = [":ring"], + deps = [ + "//pkg/util/leaktest", + "//pkg/util/log", + "@com_github_cockroachdb_datadriven//:datadriven", + "@com_github_stretchr_testify//require", + ], ) diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/circular_buffer.go b/pkg/util/container/ring/buffer.go similarity index 89% rename from pkg/kv/kvserver/kvflowcontrol/rac2/circular_buffer.go rename to pkg/util/container/ring/buffer.go index b9dc5daa0f47..a79816148193 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/circular_buffer.go +++ b/pkg/util/container/ring/buffer.go @@ -3,20 +3,20 @@ // Use of this software is governed by the CockroachDB Software License // included in the /LICENSE file. -package rac2 +package ring import ( "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/errors" ) -// CircularBuffer provides functionality akin to a []T, for cases that usually +// Buffer provides functionality akin to a []T, for cases that usually // push to the back and pop from the front, and would like to reduce // allocations. The assumption made here is that the capacity needed for // holding the live entries is somewhat stable. The circular buffer grows as // needed, and over time will shrink. Liveness of shrinking depends on new // entries being pushed. -type CircularBuffer[T any] struct { +type Buffer[T any] struct { // len(buf) == cap(buf). These are powers of 2 and >= minCap. buf []T // first is in [0, len(buf)). @@ -41,7 +41,7 @@ const minCap = 32 const shrinkCheckInterval = 3 // Push adds an entry to the end of the buffer. -func (cb *CircularBuffer[T]) Push(a T) { +func (cb *Buffer[T]) Push(a T) { needed := cb.len + 1 cap := len(cb.buf) if needed > cap { @@ -78,7 +78,7 @@ func (cb *CircularBuffer[T]) Push(a T) { // Pop removes the first num entries. // // REQUIRES: num <= cb.len. -func (cb *CircularBuffer[T]) Pop(num int) { +func (cb *Buffer[T]) Pop(num int) { if buildutil.CrdbTestBuild && num > cb.len { panic(errors.AssertionFailedf("num %d > cb.len %d", num, cb.len)) } @@ -96,7 +96,7 @@ func (cb *CircularBuffer[T]) Pop(num int) { // ShrinkToPrefix shrinks the buffer to retain the first num entries. // // REQUIRES: num <= cb.len. -func (cb *CircularBuffer[T]) ShrinkToPrefix(num int) { +func (cb *Buffer[T]) ShrinkToPrefix(num int) { if buildutil.CrdbTestBuild && num > cb.len { panic(errors.AssertionFailedf("num %d > cb.len %d", num, cb.len)) } @@ -106,7 +106,7 @@ func (cb *CircularBuffer[T]) ShrinkToPrefix(num int) { // At returns the entry at index. // // REQUIRES: index < cb.len. -func (cb *CircularBuffer[T]) At(index int) T { +func (cb *Buffer[T]) At(index int) T { if buildutil.CrdbTestBuild && index >= cb.len { panic(errors.AssertionFailedf("index %d >= cb.len %d", index, cb.len)) } @@ -117,7 +117,7 @@ func (cb *CircularBuffer[T]) At(index int) T { // SetLast overwrites the last entry. // // REQUIRES: Length() > 0. -func (cb *CircularBuffer[T]) SetLast(a T) { +func (cb *Buffer[T]) SetLast(a T) { if buildutil.CrdbTestBuild && cb.len == 0 { panic(errors.AssertionFailedf("buffer is empty")) } @@ -128,7 +128,7 @@ func (cb *CircularBuffer[T]) SetLast(a T) { // SetFirst overwrites the first entry. // // REQUIRES: Length() > 0. -func (cb *CircularBuffer[T]) SetFirst(a T) { +func (cb *Buffer[T]) SetFirst(a T) { if buildutil.CrdbTestBuild && cb.len == 0 { panic(errors.AssertionFailedf("buffer is empty")) } @@ -136,11 +136,17 @@ func (cb *CircularBuffer[T]) SetFirst(a T) { } // Length returns the current length. -func (cb *CircularBuffer[T]) Length() int { +func (cb *Buffer[T]) Length() int { return cb.len } -func (cb *CircularBuffer[T]) reallocate(size int) { +func (cb *Buffer[T]) Clone() Buffer[T] { + b := *cb + b.buf = append([]T(nil), cb.buf...) + return b +} + +func (cb *Buffer[T]) reallocate(size int) { buf := make([]T, size) capacity := len(cb.buf) // cb.buf is split into a prefix and suffix, where the prefix is diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/circular_buffer_test.go b/pkg/util/container/ring/buffer_test.go similarity index 87% rename from pkg/kv/kvserver/kvflowcontrol/rac2/circular_buffer_test.go rename to pkg/util/container/ring/buffer_test.go index a31f09b8f89d..1dd6f34751e0 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/circular_buffer_test.go +++ b/pkg/util/container/ring/buffer_test.go @@ -3,7 +3,7 @@ // Use of this software is governed by the CockroachDB Software License // included in the /LICENSE file. -package rac2 +package ring import ( "fmt" @@ -13,15 +13,23 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" ) -func TestCircularBuffer(t *testing.T) { +func TestBuffer(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - cb := CircularBuffer[int]{} + cb := Buffer[int]{} cbString := func() string { var b strings.Builder + // Clone cb and trash the clone's state. It should not affect cb. + cbClone := cb.Clone() + require.Equal(t, cbClone, cb) + for i := 0; i < len(cbClone.buf); i++ { + cbClone.buf[i] = -1 + } + cbClone = Buffer[int]{} printStats := func() { fmt.Fprintf(&b, "first: %d len: %d cap: %d pushes: %d, max-len: %d\n", cb.first, cb.len, len(cb.buf), cb.pushesSinceCheck, cb.maxObservedLen) @@ -68,11 +76,11 @@ func TestCircularBuffer(t *testing.T) { printStats() return b.String() } - datadriven.RunTest(t, "testdata/circular_buffer", + datadriven.RunTest(t, "testdata/buffer", func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "init": - cb = CircularBuffer[int]{} + cb = Buffer[int]{} return "" case "push": diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/circular_buffer b/pkg/util/container/ring/testdata/buffer similarity index 100% rename from pkg/kv/kvserver/kvflowcontrol/rac2/testdata/circular_buffer rename to pkg/util/container/ring/testdata/buffer diff --git a/pkg/util/ring/ring_buffer.go b/pkg/util/ring/ring_buffer.go index 50e3a35e31be..60b12b27606c 100644 --- a/pkg/util/ring/ring_buffer.go +++ b/pkg/util/ring/ring_buffer.go @@ -10,8 +10,10 @@ package ring // The zero value is ready to use. See MakeBuffer() for initializing a Buffer // with pre-allocated space. // -// Note: it is backed by a slice (unlike container/ring which is backed by a -// linked list). +// Note: it is backed by a slice (unlike container/ring/ring_buffer.go which +// is backed by a linked list). There is also a container/ring/buffer.go, that +// is backed by a slice and can both grow and shrink and uses bit arithmetic. +// We should replace this implementation with that one. type Buffer[T any] struct { buffer []T head int // the index of the front of the buffer