From 7e0583c7dd63ab766285d186669f3a8018faa512 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 15 Aug 2022 17:59:42 +0200 Subject: [PATCH] rangefeed: avoid emitting out-of-timebounds range tombstones Discovered this bug while adding sanity checks to BenchmarkCatchUpScan. This is similar in spirit to another bug that was just fixed, https://github.com/cockroachdb/cockroach/pull/85889. The changes to BenchmarkCatchUpScan serve as a regression test. Release note: None --- pkg/kv/kvserver/rangefeed/catchup_scan.go | 14 +++++++++++-- .../rangefeed/catchup_scan_bench_test.go | 20 +++++++++++++++++-- .../kvserver/rangefeed/catchup_scan_test.go | 7 +++++++ 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan.go b/pkg/kv/kvserver/rangefeed/catchup_scan.go index 3c0ec0c91e22..51003b88dd34 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan.go @@ -145,24 +145,34 @@ func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) err // Emit any new MVCC range tombstones when their start key is encountered. // Range keys can currently only be MVCC range tombstones. + // We need to verify that the range tombstone is visible at the catch-up + // timestamp, since we might have come here after a call to NextIgnoringTime. // // TODO(erikgrinaker): Find a faster/better way to detect range key changes // that doesn't involve constant comparisons. Pebble probably already knows, // we just need a way to ask it. + // Note that byte slice comparison in Go is smart enough to immediately bail + // if lengths are different. However, it isn't smart enough to compare from + // the end, which would really help since our keys share prefixes. if hasRange { if rangeBounds := i.RangeBounds(); !rangeBounds.Key.Equal(rangeKeysStart) { rangeKeysStart = append(rangeKeysStart[:0], rangeBounds.Key...) // Emit events for these MVCC range tombstones, in chronological order. versions := i.RangeKeys().Versions - for i := len(versions) - 1; i >= 0; i-- { + for j := len(versions) - 1; j >= 0; j-- { + if !i.startTime.LessEq(versions[j].Timestamp) { + // This range tombstone isn't visible by this catch-up scan. + continue + } + var span roachpb.Span a, span.Key = a.Copy(rangeBounds.Key, 0) a, span.EndKey = a.Copy(rangeBounds.EndKey, 0) err := outputFn(&roachpb.RangeFeedEvent{ DeleteRange: &roachpb.RangeFeedDeleteRange{ Span: span, - Timestamp: versions[i].Timestamp, + Timestamp: versions[j].Timestamp, }, }) if err != nil { diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go b/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go index b765c248e9c6..2f28fb2630b6 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go @@ -34,7 +34,7 @@ import ( "github.com/stretchr/testify/require" ) -func runCatchUpBenchmark(b *testing.B, emk engineMaker, opts benchOptions) { +func runCatchUpBenchmark(b *testing.B, emk engineMaker, opts benchOptions) (numEvents int) { eng, _ := setupData(context.Background(), b, emk, opts.dataOpts) defer eng.Close() startKey := roachpb.Key(encoding.EncodeUvarintAscending([]byte("key-"), uint64(0))) @@ -60,8 +60,17 @@ func runCatchUpBenchmark(b *testing.B, emk engineMaker, opts benchOptions) { if counter < 1 { b.Fatalf("didn't emit any events!") } + if numEvents == 0 { + // Preserve number of events so that caller can compare it between + // different invocations that it knows should not affect number of + // events. + numEvents = counter + } + // Number of events can't change between iterations. + require.Equal(b, numEvents, counter) }() } + return numEvents } func BenchmarkCatchUpScan(b *testing.B) { @@ -141,11 +150,18 @@ func BenchmarkCatchUpScan(b *testing.B) { b.Run(fmt.Sprintf("numRangeKeys=%d", numRangeKeys), func(b *testing.B) { do := do do.numRangeKeys = numRangeKeys - runCatchUpBenchmark(b, setupMVCCPebble, benchOptions{ + n := runCatchUpBenchmark(b, setupMVCCPebble, benchOptions{ dataOpts: do, ts: ts, withDiff: withDiff, }) + // We shouldn't be seeing the range deletions returned in this + // benchmark since they are at timestamp 1 and we catch up at + // a timestamp >= 5 (which corresponds to tsExcludePercent == + // 0). Note that the oldest key is always excluded, since the + // floor for wallTime is 5 and that's the oldest key's + // timestamp but the start timestamp is exclusive. + require.EqualValues(b, int64(numKeys)-wallTime/5, n) }) } }) diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan_test.go b/pkg/kv/kvserver/rangefeed/catchup_scan_test.go index 2ef24d5362e0..9a6b3b596877 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan_test.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan_test.go @@ -31,6 +31,13 @@ import ( // https://github.com/cockroachdb/cockroach/issues/82715 // // For now, see rangefeed_external_test.go for rudimentary range key tests. +// +// To invoke and compare on the numRangeKeys dimension: +// +// go test ./pkg/kv/kvserver/rangefeed/ -run - -count 10 -bench BenchmarkCatchUpScan 2>&1 | tee bench.txt +// for flavor in numRangeKeys=0 numRangeKeys=1 numRangeKeys=100; do grep -E "${flavor}[^0-9]+" bench.txt | sed -E "s/${flavor}+/X/" > $flavor.txt; done +// benchstat numRangeKeys\={0,1}.txt +// benchstat numRangeKeys\={0,100}.txt func TestCatchupScan(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t)