Skip to content

Commit

Permalink
Merge #131020
Browse files Browse the repository at this point in the history
131020: requestbatcher: reduce timing-sensitivity in test r=tbg a=tbg

A number of the requestbatcher tests are sadly timing-sensitive.  They
set the max wait times for the batcher to carefully crafted values, but
are also sensitive to go scheduling latencies that can play into how
much time elapses between the batcher getting handed the various
requests.
Given how flaky they _should_ be, they are remarkably stable, but a
simple `time.Sleep` in the right place deterministically causes issues.

This commit deflakes a specific test - `TestBatcherSend` - and does so
by making time and timers completely mockable in the batcher through
injection of a `ManualTime`. It also introduces a simple mechanism that
allows tests to peek inside of the batcher by momentarily stopping its
worker goroutine.

I stopped short of updating the other tests entirely - that would likely
take me more time than I'm willing to additionally spend here - but I
left comments that should be actionable should any of these tests ever
cause issues.

Fixes #130969.

Epic: none
Release note: None


Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Sep 20, 2024
2 parents e08158e + 4861d66 commit 6a5017e
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 26 deletions.
1 change: 1 addition & 0 deletions pkg/internal/client/requestbatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/stop",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_x_sync//errgroup",
Expand Down
53 changes: 38 additions & 15 deletions pkg/internal/client/requestbatcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,13 @@ type Config struct {
// DefaultInFlightBackpressureLimit.
InFlightBackpressureLimit func() int

// NowFunc is used to determine the current time. It defaults to timeutil.Now.
NowFunc func() time.Time
manualTime *timeutil.ManualTime // optional for testing
// This channel can be populated in tests with an unbuffered channel in
// which case the batcher will attempt to send itself over it, allowing
// tests to pause the batcher's goroutine and inspect state. Once the
// test is done it _must_ return the batcher on the channel to unblock
// the batcher's main loop again.
testingPeekCh chan *RequestBatcher
}

const (
Expand Down Expand Up @@ -258,9 +263,6 @@ func validateConfig(cfg *Config) {
return DefaultInFlightBackpressureLimit
}
}
if cfg.NowFunc == nil {
cfg.NowFunc = timeutil.Now
}
}

func normalizedInFlightBackPressureLimit(cfg *Config) int {
Expand All @@ -271,6 +273,25 @@ func normalizedInFlightBackPressureLimit(cfg *Config) int {
return limit
}

func (b *RequestBatcher) now() time.Time {
if b.cfg.manualTime != nil {
return b.cfg.manualTime.Now()
}
return timeutil.Now()
}

func (b *RequestBatcher) newTimer() timeutil.TimerI {
if b.cfg.manualTime != nil {
return b.cfg.manualTime.NewTimer()
}
return (&timeutil.Timer{}).AsTimerI()
}

func (b *RequestBatcher) until(t time.Time) time.Duration {
return t.Sub(b.now())

}

// SendWithChan sends a request with a client provided response channel. The
// client is responsible for ensuring that the passed respChan has a buffer at
// least as large as the number of responses it expects to receive. Using an
Expand Down Expand Up @@ -345,7 +366,7 @@ func (b *RequestBatcher) sendBatch(ctx context.Context, ba *batch) {
timeout = b.cfg.MaxTimeout
}
if !ba.latestRequestDeadline.IsZero() {
reqTimeout := timeutil.Until(ba.latestRequestDeadline)
reqTimeout := b.until(ba.latestRequestDeadline)
if timeout == 0 || reqTimeout < timeout {
timeout = reqTimeout
}
Expand Down Expand Up @@ -504,7 +525,7 @@ func (b *RequestBatcher) run(ctx context.Context) {
}
}
handleRequest = func(req *request) {
now := b.cfg.NowFunc()
now := b.now()
ba, existsInQueue := b.batches.get(req.rangeID)
if !existsInQueue {
ba = b.pool.newBatch(now)
Expand All @@ -519,19 +540,19 @@ func (b *RequestBatcher) run(ctx context.Context) {
}
}
deadline time.Time
timer timeutil.Timer
timer = b.newTimer()
)
defer timer.Stop()

maybeSetTimer := func() {
maybeSetTimer := func(read bool) {
var nextDeadline time.Time
if next := b.batches.peekFront(); next != nil {
nextDeadline = next.deadline
}
if !deadline.Equal(nextDeadline) || timer.Read {
if !deadline.Equal(nextDeadline) || read {
deadline = nextDeadline
if !deadline.IsZero() {
timer.Reset(timeutil.Until(deadline))
timer.Reset(b.until(deadline))
} else {
// Clear the current timer due to a sole batch already sent before
// the timer fired.
Expand All @@ -542,13 +563,15 @@ func (b *RequestBatcher) run(ctx context.Context) {

for {
select {
case b.cfg.testingPeekCh <- b:
<-b.cfg.testingPeekCh
case req := <-reqChan():
handleRequest(req)
maybeSetTimer()
case <-timer.C:
timer.Read = true
maybeSetTimer(false)
case <-timer.Ch():
timer.MarkRead()
sendBatch(b.batches.popFront())
maybeSetTimer()
maybeSetTimer(true)
case <-b.sendDoneChan:
handleSendDone()
case <-b.cfg.Stopper.ShouldQuiesce():
Expand Down
95 changes: 84 additions & 11 deletions pkg/internal/client/requestbatcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -79,6 +80,9 @@ func (g *senderGroup) Wait() error {
}

func TestBatcherSendOnSizeWithReset(t *testing.T) {
// Note: the timing-dependency and possible flakiness can be addressed by
// using a manual time(r) source, see TestBatcherSend for an example.

// This test ensures that when a single batch ends up sending due to size
// constrains its timer is successfully canceled and does not lead to a
// nil panic due to an attempt to send a batch due to the old timer.
Expand Down Expand Up @@ -138,18 +142,35 @@ func TestBatchesAtTheSameTime(t *testing.T) {
sc := make(chanSender)
start := timeutil.Now()
then := start.Add(10 * time.Millisecond)
mt := timeutil.NewManualTime(then)
b := New(Config{
MaxIdle: 20 * time.Millisecond,
Sender: sc,
Stopper: stopper,
NowFunc: func() time.Time { return then },
MaxIdle: 20 * time.Millisecond,
Sender: sc,
Stopper: stopper,
manualTime: mt,
})
const N = 20
sendChan := make(chan Response, N)
for i := 0; i < N; i++ {
assert.Nil(t, b.SendWithChan(
context.Background(), sendChan, roachpb.RangeID(i), &kvpb.GetRequest{}, kvpb.AdmissionHeader{}))
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func(ctx context.Context) {
// At this point, all the requests should've made it into the
// batcher and have been timestamped. We want to be a real clock
// so that the timers fire on their own accord.
for {
select {
case <-time.After(5 * time.Millisecond):
mt.Advance(5 * time.Millisecond)
case <-ctx.Done():
return
}
}
}(ctx)
for i := 0; i < N; i++ {
bs := <-sc
bs.respChan <- batchResp{}
Expand All @@ -160,6 +181,10 @@ func TestBackpressure(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

// Note: the timing-dependency and possible flakiness can be addressed by
// using a manual time(r) source, see TestBatcherSend for an example.

sc := make(chanSender)
backpressureLimit := 3
b := New(Config{
Expand Down Expand Up @@ -260,30 +285,74 @@ func TestBatcherSend(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
sc := make(chanSender)
mt := timeutil.NewManualTime(time.Time{})
peekCh := make(chan *RequestBatcher) // must be unbuffered
b := New(Config{
// We're using a manual timer here, so these fire when we advance `mt`
// accordingly.
MaxIdle: 50 * time.Millisecond,
MaxWait: 50 * time.Millisecond,
MaxMsgsPerBatch: 3,
Sender: sc,
Stopper: stopper,
manualTime: mt,
testingPeekCh: peekCh,
})

// Send 3 requests to range 2 and 2 to range 1.
// The 3rd range 2 request will trigger immediate sending due to the
// MaxMsgsPerBatch configuration. The range 1 batch will be sent after the
// MaxWait timeout expires.
// MaxWait timeout expires (manually via `mt`).
g := senderGroup{b: b}
g.Send(1, &kvpb.GetRequest{})
g.Send(2, &kvpb.GetRequest{})
g.Send(1, &kvpb.GetRequest{})
g.Send(2, &kvpb.GetRequest{})
g.Send(2, &kvpb.GetRequest{})
// Wait for the range 2 request and ensure it contains 3 requests.
s := <-sc
assert.Len(t, s.ba.Requests, 3)
s.respChan <- batchResp{}

// We should ~immediately see the requests to r2 show up in a single
// batch because no timers are firing but three is the limit for when
// a batch is full. We should not see anything to r1 yet because this
// is waiting for us to fire a timer.
// NB: we don't actually verify that they're for r2. This could be added
// (noting that ba.RangeID is zero at this level of the stack, so that won't
// do it). But - we check that the requests to r1 are still in the batcher
// later in the test.
select {
case s := <-sc:
require.Len(t, s.ba.Requests, 3)
s.respChan <- batchResp{}
case <-time.After(5 * time.Second):
t.Fatalf("requests to r2 did not show up")
}

// Check that r1 is queued up in entirety. This is
// a nice check and also assures that once we fire
// the timer, we can expect to see everything at once.
testutils.SucceedsSoon(t, func() error {
b := <-peekCh
defer func() {
peekCh <- b
}()
var r1waiting int
if r1b, ok := b.batches.get(1); ok {
r1waiting = len(r1b.reqs)
}
if r1waiting != 2 {
return errors.Errorf("expect two requests waiting on r1, not %d", r1waiting)
}
return nil
})

// There should be a timer at this point since we know we have the requests
// to r1 waiting.
require.Len(t, mt.Timers(), 1)
// Time passes and the timer is triggered.
mt.AdvanceTo(mt.Timers()[0])

// Wait for the range 1 request and ensure it contains 2 requests.
s = <-sc
assert.Len(t, s.ba.Requests, 2)
s := <-sc
require.Len(t, s.ba.Requests, 2)
s.respChan <- batchResp{}
// Make sure everything gets a response.
if err := g.Wait(); err != nil {
Expand Down Expand Up @@ -361,6 +430,10 @@ func TestBatchTimeout(t *testing.T) {
const timeout = 5 * time.Millisecond
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

// Note: the timing-dependency and possible flakiness can be addressed by
// using a manual time(r) source, see TestBatcherSend for an example.

testCases := []struct {
requestTimeout time.Duration
maxTimeout time.Duration
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/timeutil/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ type Timer struct {
Read bool
}

// AsTimerI returns the Timer as a TimerI. This is helpful
// to write code that accepts a Timer in production and a manual
// timer in tests.
func (t *Timer) AsTimerI() TimerI {
return (*timer)(t)
}

// Reset changes the timer to expire after duration d and returns
// the new value of the timer. This method includes the fix proposed
// in https://github.com/golang/go/issues/11513#issuecomment-157062583,
Expand Down

0 comments on commit 6a5017e

Please sign in to comment.