Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
135228: sql: fix edge case causing suboptimal generic query plans r=mgartner a=mgartner

This commit fixes a bug that caused suboptimal generic query plans to be
planned under all of the following conditions:

1. `plan_cache_mode` was set to `force_custom_plan` (the default).
2. A query was prepared and an ideal generic query plan was selected
   (i.e., it used the placeholder fast path).
3. New stats were collected, making the original plan stale and
   increasing the estimated row count of the root expression beyond
   `maxRowCountForPlaceholderFastPath` (10).
4. The prepared query was re-executed.

Fixes #135151

There is no release note because this bug does not exist in any
releases.

Release note: None


135237: rac2,tracker: use CircularBuffer for tracker.Inflights r=kvoli,pav-kv a=sumeerbhola

The custom circular/ring buffer implementation in Raft's tracker.Inflights does not shrink and unnecessarily grows to the maximum configured size (unless the buffer becomes empty). With RAC2, the maximum size is no longer a maximum, in lazy replication mode. There was a bug in incorrectly trying to impose this maximum, which motivated this slightly bigger cleanup. In general, it is beneficial for the inflight buffer to also shrink, and by using the existing CircularBuffer, we can simplify the code in tracker.Inflights.

As part of this change rac2.CircularBuffer is moved to util/container/ring and becomes ring.Buffer. There is another ring.Buffer in util/ring, which doesn't shrink and is not optimized to use bit arithmetic -- there is a todo added there to replace it with the implementation in util/container/ring.

Fixes #135223

Epic: none

Release note: None

Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: sumeerbhola <[email protected]>
  • Loading branch information
3 people committed Nov 15, 2024
3 parents abbec7d + 050792b + e5dca03 commit 5771a79
Show file tree
Hide file tree
Showing 15 changed files with 239 additions and 171 deletions.
97 changes: 97 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/generic
Original file line number Diff line number Diff line change
Expand Up @@ -1220,3 +1220,100 @@ ALTER TABLE a RENAME TO b

statement error pgcode 42P01 pq: relation \"a\" does not exist
EXECUTE p

statement ok
DEALLOCATE p

# Regression test for #135151. Do not build suboptimal generic plans when an
# ideal generic plan (using the placeholder fast path) was previously planned.
statement ok
CREATE TABLE t135151 (
k INT PRIMARY KEY,
a INT,
b INT,
INDEX (a, b)
);

statement ok
SET plan_cache_mode = force_custom_plan;

statement ok
PREPARE p AS SELECT k FROM t135151 WHERE a = $1 AND b = $2;

query T
EXPLAIN ANALYZE EXECUTE p(1, 2);
----
planning time: 10µs
execution time: 100µs
distribution: <hidden>
vectorized: <hidden>
plan type: generic, reused
maximum memory usage: <hidden>
network usage: <hidden>
regions: <hidden>
isolation level: serializable
priority: normal
quality of service: regular
·
• scan
sql nodes: <hidden>
kv nodes: <hidden>
regions: <hidden>
actual row count: 0
KV time: 0µs
KV contention time: 0µs
KV rows decoded: 0
KV bytes read: 0 B
KV gRPC calls: 0
estimated max memory allocated: 0 B
missing stats
table: t135151@t135151_a_b_idx
spans: [/1/2 - /1/2]

statement ok
ALTER TABLE t135151 INJECT STATISTICS '[
{
"columns": ["a"],
"created_at": "2018-01-01 1:00:00.00000+00:00",
"row_count": 10000,
"distinct_count": 10,
"avg_size": 1
},
{
"columns": ["b"],
"created_at": "2018-01-01 1:00:00.00000+00:00",
"row_count": 10000,
"distinct_count": 10,
"avg_size": 1
}
]';

query T
EXPLAIN ANALYZE EXECUTE p(1, 2);
----
planning time: 10µs
execution time: 100µs
distribution: <hidden>
vectorized: <hidden>
plan type: custom
maximum memory usage: <hidden>
network usage: <hidden>
regions: <hidden>
isolation level: serializable
priority: normal
quality of service: regular
·
• scan
sql nodes: <hidden>
kv nodes: <hidden>
regions: <hidden>
actual row count: 0
KV time: 0µs
KV contention time: 0µs
KV rows decoded: 0
KV bytes read: 0 B
KV gRPC calls: 0
estimated max memory allocated: 0 B
estimated row count: 100 (1.0% of the table; stats collected <hidden> ago)
table: t135151@t135151_a_b_idx
spans: [/1/2 - /1/2]
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "rac2",
srcs = [
"circular_buffer.go",
"log_tracker.go",
"metrics.go",
"priority.go",
Expand All @@ -28,6 +27,7 @@ go_library(
"//pkg/settings/cluster",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/container/ring",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/log",
Expand All @@ -45,7 +45,6 @@ go_library(
go_test(
name = "rac2_test",
srcs = [
"circular_buffer_test.go",
"log_tracker_test.go",
"priority_test.go",
"range_controller_test.go",
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/container/ring"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
Expand Down Expand Up @@ -109,7 +110,7 @@ type LogTracker struct {
// - waiting[pri][i].Term <= last.Term
// - waiting[pri][i].Index < waiting[pri][i+1].Index
// - waiting[pri][i].Term <= waiting[pri][i+1].Term
waiting [raftpb.NumPriorities]CircularBuffer[LogMark]
waiting [raftpb.NumPriorities]ring.Buffer[LogMark]
}

// NewLogTracker returns a LogTracker initialized to the given log mark. The
Expand Down Expand Up @@ -354,7 +355,7 @@ func (l *LogTracker) errorf(ctx context.Context, format string, args ...any) {

// truncate updates the slice to be a prefix of the ordered log marks slice,
// with all marks at index > after removed from it.
func truncate(marks *CircularBuffer[LogMark], after uint64) {
func truncate(marks *ring.Buffer[LogMark], after uint64) {
n := marks.Length()
for i := n; i > 0; i-- {
if marks.At(i-1).Index <= after {
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/util/container/ring"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

Expand All @@ -31,7 +32,7 @@ type Tracker struct {
term uint64
// tracked contains the per-priority tracked log entries ordered by log index.
// All the tracked entries are in the term's leader log.
tracked [raftpb.NumPriorities]CircularBuffer[tracked]
tracked [raftpb.NumPriorities]ring.Buffer[tracked]

stream kvflowcontrol.Stream // used for logging only
}
Expand All @@ -45,7 +46,7 @@ type tracked struct {
func (t *Tracker) Init(term uint64, stream kvflowcontrol.Stream) {
*t = Tracker{
term: term,
tracked: [raftpb.NumPriorities]CircularBuffer[tracked]{},
tracked: [raftpb.NumPriorities]ring.Buffer[tracked]{},
stream: stream,
}
}
Expand Down Expand Up @@ -136,7 +137,7 @@ func (t *Tracker) UntrackAll() (returned [raftpb.NumPriorities]kvflowcontrol.Tok
returned[pri] += t.tracked[pri].At(i).tokens
}
}
t.tracked = [raftpb.NumPriorities]CircularBuffer[tracked]{}
t.tracked = [raftpb.NumPriorities]ring.Buffer[tracked]{}
return returned
}

Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/roachpb",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/container/ring",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/stop",
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
package replica_rac2

import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/util/container/ring"
)

// lowPriOverrideState records which raft log entries have their priority
Expand Down Expand Up @@ -69,7 +69,7 @@ type lowPriOverrideState struct {
//
// A call to getEffectivePriority for index i causes a prefix of indices <=
// i to be discarded.
intervals rac2.CircularBuffer[interval]
intervals ring.Buffer[interval]
// Highest term observed so far.
leaderTerm uint64
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/raft/tracker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//pkg/raft/raftlogger",
"//pkg/raft/raftpb",
"//pkg/raft/raftstoreliveness",
"//pkg/util/container/ring",
"//pkg/util/hlc",
"//pkg/util/syncutil",
"@com_github_cockroachdb_redact//:redact",
Expand All @@ -37,6 +38,7 @@ go_test(
"//pkg/raft/raftlogger",
"//pkg/raft/raftpb",
"//pkg/raft/raftstoreliveness",
"//pkg/util/container/ring",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
68 changes: 15 additions & 53 deletions pkg/raft/tracker/inflights.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package tracker

import "github.com/cockroachdb/cockroach/pkg/util/container/ring"

// inflight describes an in-flight MsgApp message.
type inflight struct {
index uint64 // the index of the last entry inside the message
Expand All @@ -29,10 +31,6 @@ type inflight struct {
// they are sending a new append, and release "quota" via FreeLE() whenever an
// ack is received.
type Inflights struct {
// the starting index in the buffer
start int

count int // number of inflight messages in the buffer
bytes uint64 // number of inflight bytes

// TODO(pav-kv): do not store the limits here, pass them to methods. For flow
Expand All @@ -41,7 +39,7 @@ type Inflights struct {
maxBytes uint64 // the max total byte size of inflight messages

// buffer is a ring buffer containing info about all in-flight messages.
buffer []inflight
buffer ring.Buffer[inflight]
}

// NewInflights sets up an Inflights that allows up to size inflight messages,
Expand All @@ -59,7 +57,7 @@ func NewInflights(size int, maxBytes uint64) *Inflights {
// the receiver.
func (in *Inflights) Clone() *Inflights {
ins := *in
ins.buffer = append([]inflight(nil), in.buffer...)
ins.buffer = in.buffer.Clone()
return &ins
}

Expand All @@ -73,78 +71,42 @@ func (in *Inflights) Clone() *Inflights {
// is implemented at the higher app level. The tracker correctly tracks all the
// in-flight entries.
func (in *Inflights) Add(index, bytes uint64) {
next := in.start + in.count
size := in.size
if next >= size {
next -= size
}
if next >= len(in.buffer) {
in.grow()
}
in.buffer[next] = inflight{index: index, bytes: bytes}
in.count++
in.buffer.Push(inflight{index: index, bytes: bytes})
in.bytes += bytes
}

// grow the inflight buffer by doubling up to inflights.size. We grow on demand
// instead of preallocating to inflights.size to handle systems which have
// thousands of Raft groups per process.
func (in *Inflights) grow() {
newSize := len(in.buffer) * 2
if newSize == 0 {
newSize = 1
} else if newSize > in.size {
newSize = in.size
}
newBuffer := make([]inflight, newSize)
copy(newBuffer, in.buffer)
in.buffer = newBuffer
}

// FreeLE frees the inflights smaller or equal to the given `to` flight.
func (in *Inflights) FreeLE(to uint64) {
if in.count == 0 || to < in.buffer[in.start].index {
n := in.buffer.Length()
if n == 0 || to < in.buffer.At(0).index {
// out of the left side of the window
return
}

idx := in.start
var i int
var bytes uint64
for i = 0; i < in.count; i++ {
if to < in.buffer[idx].index { // found the first large inflight
for i = 0; i < n; i++ {
e := in.buffer.At(i)
if to < e.index { // found the first large inflight
break
}
bytes += in.buffer[idx].bytes

// increase index and maybe rotate
size := in.size
if idx++; idx >= size {
idx -= size
}
bytes += e.bytes
}
// free i inflights and set new start index
in.count -= i
in.buffer.Pop(i)
in.bytes -= bytes
in.start = idx
if in.count == 0 {
// inflights is empty, reset the start index so that we don't grow the
// buffer unnecessarily.
in.start = 0
}
}

// Full returns true if no more messages can be sent at the moment.
func (in *Inflights) Full() bool {
return in.count >= in.size || (in.maxBytes != 0 && in.bytes >= in.maxBytes)
return in.buffer.Length() >= in.size || (in.maxBytes != 0 && in.bytes >= in.maxBytes)
}

// Count returns the number of inflight messages.
func (in *Inflights) Count() int { return in.count }
func (in *Inflights) Count() int { return in.buffer.Length() }

// reset frees all inflights.
func (in *Inflights) reset() {
in.start = 0
in.count = 0
in.buffer.ShrinkToPrefix(0)
in.bytes = 0
}
Loading

0 comments on commit 5771a79

Please sign in to comment.