Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
84881: startupmigrations: check for migration completion optimistically on a follower r=ajwerner a=ajwerner

`INCONSISTENT` reads can be performed on followers. Most always these
startup migrations are complete. We can avoid a wide-area round-trip by
checking optimistically with a follower before doing a consistent read.

Release note: None


86111: storage: disable range key masking for `MVCCGet` r=tbg a=erikgrinaker

**storage: fix range key detection in `pebbleMVCCScanner.get()`**

`RangeKeyChanged()` did not appear to fire properly when `MVCCGet` was
changed to disable range key masking and the prefix iterator was reused
from the batch (as seen by `BenchmarkMVCCGet` in the `batch=true` case).

This patch adds a workaround in `pebbleMVCCScanner.get()` that checks
`HasPointAndRange()` after seeking the iterator rather than before.

Release note: None
  
**storage: disable range key masking for `MVCCGet`**

This patch disables range key masking for `MVCCGet`, and disallows using
it with prefix iterators in general, due to the significant performance
overhead.

```
name                                                                   old time/op    new time/op    delta
MVCCGet_Pebble/batch=true/versions=1/valueSize=8/numRangeKeys=0-24       3.31µs ± 1%    2.48µs ± 1%   -24.97%  (p=0.000 n=10+9)
MVCCGet_Pebble/batch=true/versions=1/valueSize=8/numRangeKeys=1-24       6.72µs ± 1%    3.15µs ± 1%   -53.12%  (p=0.000 n=10+10)
MVCCGet_Pebble/batch=true/versions=1/valueSize=8/numRangeKeys=100-24     78.5µs ± 2%    75.1µs ± 2%    -4.41%  (p=0.000 n=10+10)
MVCCGet_Pebble/batch=true/versions=10/valueSize=8/numRangeKeys=0-24      4.19µs ± 2%    3.70µs ± 0%   -11.51%  (p=0.000 n=10+10)
MVCCGet_Pebble/batch=true/versions=10/valueSize=8/numRangeKeys=1-24      9.22µs ± 1%    7.25µs ± 3%   -21.39%  (p=0.000 n=9+10)
MVCCGet_Pebble/batch=true/versions=10/valueSize=8/numRangeKeys=100-24    81.6µs ± 1%    79.0µs ± 1%    -3.22%  (p=0.000 n=10+10)
```

Resolves cockroachdb#86086.

Release note: None

Co-authored-by: Andrew Werner <[email protected]>
Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
3 people committed Aug 15, 2022
3 parents 6b7f005 + 53e4c11 + b1af62e commit 5527486
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 139 deletions.
3 changes: 1 addition & 2 deletions pkg/startupmigrations/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@ go_library(
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/config/zonepb",
"//pkg/jobs",
"//pkg/keys",
"//pkg/kv",
"//pkg/roachpb",
"//pkg/security/username",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/descpb",
"//pkg/sql/sem/tree",
Expand Down Expand Up @@ -60,6 +58,7 @@ go_test(
"//pkg/testutils/sqlutils",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/stop",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
Expand Down
120 changes: 60 additions & 60 deletions pkg/startupmigrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,17 @@ package startupmigrations
import (
"context"
"fmt"
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -422,6 +419,10 @@ type DB interface {
Get(ctx context.Context, key interface{}) (kv.KeyValue, error)
Put(ctx context.Context, key, value interface{}) error
Txn(ctx context.Context, retryable func(ctx context.Context, txn *kv.Txn) error) error

// ReadCommittedScan is like Scan but may return an inconsistent and stale
// snapshot.
ReadCommittedScan(ctx context.Context, begin, end interface{}, maxRows int64) ([]kv.KeyValue, error)
}

// Manager encapsulates the necessary functionality for handling migrations
Expand Down Expand Up @@ -456,7 +457,7 @@ func NewManager(
return &Manager{
stopper: stopper,
leaseManager: leasemanager.New(db, clock, opts),
db: db,
db: dbAdapter{DB: db},
codec: codec,
sqlExecutor: executor,
testingKnobs: testingKnobs,
Expand All @@ -465,42 +466,22 @@ func NewManager(
}
}

// ExpectedDescriptorIDs returns the list of all expected system descriptor IDs,
// including those added by completed migrations. This is needed for certain
// tests, which check the number of ranges and system tables at node startup.
//
// NOTE: This value may be out-of-date if another node is actively running
// migrations, and so should only be used in test code where the migration
// lifecycle is tightly controlled.
func ExpectedDescriptorIDs(
ctx context.Context,
db DB,
codec keys.SQLCodec,
defaultZoneConfig *zonepb.ZoneConfig,
defaultSystemZoneConfig *zonepb.ZoneConfig,
) (descpb.IDs, error) {
completedMigrations, err := getCompletedMigrations(ctx, db, codec)
if err != nil {
// dbAdapter augments the kv.DB with a ReadCommittedScan method as required
// by the DB interface.
type dbAdapter struct {
*kv.DB
}

func (d dbAdapter) ReadCommittedScan(
ctx context.Context, begin, end interface{}, maxRows int64,
) ([]kv.KeyValue, error) {
var b kv.Batch
b.Header.ReadConsistency = roachpb.INCONSISTENT
b.Scan(begin, end)
if err := d.Run(ctx, &b); err != nil {
return nil, err
}
descriptorIDs := bootstrap.MakeMetadataSchema(codec, defaultZoneConfig, defaultSystemZoneConfig).DescriptorIDs()
for _, migration := range backwardCompatibleMigrations {
// Is the migration not creating descriptors?
if migration.newDescriptorIDs == nil ||
// Is the migration included in the metadata schema considered above?
(migration.includedInBootstrap != roachpb.Version{}) {
continue
}
if _, ok := completedMigrations[string(migrationKey(codec, migration))]; ok {
newIDs, err := migration.newDescriptorIDs(ctx, db, codec)
if err != nil {
return nil, err
}
descriptorIDs = append(descriptorIDs, newIDs...)
}
}
sort.Sort(descriptorIDs)
return descriptorIDs, nil
return b.Results[0].Rows, nil
}

// EnsureMigrations should be run during node startup to ensure that all
Expand All @@ -511,27 +492,18 @@ func (m *Manager) EnsureMigrations(ctx context.Context, bootstrapVersion roachpb
defer m.testingKnobs.AfterEnsureMigrations()
}
// First, check whether there are any migrations that need to be run.
completedMigrations, err := getCompletedMigrations(ctx, m.db, m.codec)
if err != nil {
// We do the check potentially twice, once with a readCommittedScan which
// might read stale values, but can be performed locally, and then, if
// there are migrations to run, again with a consistent scan.
if allComplete, err := m.checkIfAllMigrationsAreComplete(
ctx, bootstrapVersion, m.db.ReadCommittedScan,
); err != nil || allComplete {
return err
}
allMigrationsCompleted := true
for _, migration := range backwardCompatibleMigrations {
if !m.shouldRunMigration(migration, bootstrapVersion) {
continue
}
if m.testingKnobs.DisableBackfillMigrations && migration.doesBackfill {
log.Infof(ctx, "ignoring migrations after (and including) %s due to testing knob",
migration.name)
break
}
key := migrationKey(m.codec, migration)
if _, ok := completedMigrations[string(key)]; !ok {
allMigrationsCompleted = false
}
}
if allMigrationsCompleted {
return nil
if allComplete, err := m.checkIfAllMigrationsAreComplete(
ctx, bootstrapVersion, m.db.Scan,
); err != nil || allComplete {
return err
}

// If there are any, grab the migration lease to ensure that only one
Expand All @@ -543,6 +515,7 @@ func (m *Manager) EnsureMigrations(ctx context.Context, bootstrapVersion roachpb
if log.V(1) {
log.Info(ctx, "trying to acquire lease")
}
var err error
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
lease, err = m.leaseManager.AcquireLease(ctx, m.codec.StartupMigrationLeaseKey())
if err == nil {
Expand Down Expand Up @@ -597,7 +570,7 @@ func (m *Manager) EnsureMigrations(ctx context.Context, bootstrapVersion roachpb

// Re-get the list of migrations in case any of them were completed between
// our initial check and our grabbing of the lease.
completedMigrations, err = getCompletedMigrations(ctx, m.db, m.codec)
completedMigrations, err := getCompletedMigrations(ctx, m.db.Scan, m.codec)
if err != nil {
return err
}
Expand Down Expand Up @@ -642,6 +615,31 @@ func (m *Manager) EnsureMigrations(ctx context.Context, bootstrapVersion roachpb
return nil
}

func (m *Manager) checkIfAllMigrationsAreComplete(
ctx context.Context, bootstrapVersion roachpb.Version, scan scanFunc,
) (completedAll bool, _ error) {
completedMigrations, err := getCompletedMigrations(ctx, scan, m.codec)
if err != nil {
return false, err
}
allMigrationsCompleted := true
for _, migration := range backwardCompatibleMigrations {
if !m.shouldRunMigration(migration, bootstrapVersion) {
continue
}
if m.testingKnobs.DisableBackfillMigrations && migration.doesBackfill {
log.Infof(ctx, "ignoring migrations after (and including) %s due to testing knob",
migration.name)
break
}
key := migrationKey(m.codec, migration)
if _, ok := completedMigrations[string(key)]; !ok {
allMigrationsCompleted = false
}
}
return allMigrationsCompleted, nil
}

func (m *Manager) shouldRunMigration(
migration migrationDescriptor, bootstrapVersion roachpb.Version,
) bool {
Expand All @@ -662,14 +660,16 @@ func (m *Manager) shouldRunMigration(
return true
}

type scanFunc = func(_ context.Context, from, to interface{}, maxRows int64) ([]kv.KeyValue, error)

func getCompletedMigrations(
ctx context.Context, db DB, codec keys.SQLCodec,
ctx context.Context, scan scanFunc, codec keys.SQLCodec,
) (map[string]struct{}, error) {
if log.V(1) {
log.Info(ctx, "trying to get the list of completed migrations")
}
prefix := codec.StartupMigrationKeyPrefix()
keyvals, err := db.Scan(ctx, prefix, prefix.PrefixEnd(), 0 /* maxRows */)
keyvals, err := scan(ctx, prefix, prefix.PrefixEnd(), 0 /* maxRows */)
if err != nil {
return nil, errors.Wrapf(err, "failed to get list of completed migrations")
}
Expand Down
37 changes: 30 additions & 7 deletions pkg/startupmigrations/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"fmt"
"math/rand"
"strings"
"testing"
"time"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -85,11 +87,32 @@ func (f *fakeLeaseManager) TimeRemaining(l *leasemanager.Lease) time.Duration {

type fakeDB struct {
codec keys.SQLCodec
rand *rand.Rand
kvs map[string][]byte
scanErr error
putErr error
}

func newFakeDB(codec keys.SQLCodec) *fakeDB {
r, _ := randutil.NewTestRand()
return &fakeDB{
codec: codec,
rand: r,
kvs: make(map[string][]byte),
}
}

// ReadCommittedScan never returns any data.
func (f *fakeDB) ReadCommittedScan(
ctx context.Context, begin, end interface{}, maxRows int64,
) ([]kv.KeyValue, error) {
// Sometimes return the data, sometimes return nothing.
if f.rand.Float64() < .9 {
return f.Scan(ctx, begin, end, maxRows)
}
return nil, nil
}

func (f *fakeDB) Scan(
ctx context.Context, begin, end interface{}, maxRows int64,
) ([]kv.KeyValue, error) {
Expand Down Expand Up @@ -135,7 +158,7 @@ func (f *fakeDB) Txn(context.Context, func(context.Context, *kv.Txn) error) erro
func TestEnsureMigrations(t *testing.T) {
defer leaktest.AfterTest(t)()
codec := keys.SystemSQLCodec
db := &fakeDB{codec: codec}
db := newFakeDB(codec)
mgr := Manager{
stopper: stop.NewStopper(),
leaseManager: &fakeLeaseManager{},
Expand Down Expand Up @@ -237,7 +260,7 @@ func TestSkipMigrationsIncludedInBootstrap(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
codec := keys.SystemSQLCodec
db := &fakeDB{codec: codec}
db := newFakeDB(codec)
mgr := Manager{
stopper: stop.NewStopper(),
leaseManager: &fakeLeaseManager{},
Expand Down Expand Up @@ -280,7 +303,7 @@ func TestClusterWideMigrationOnlyRunBySystemTenant(t *testing.T) {
}

ctx := context.Background()
db := &fakeDB{codec: codec}
db := newFakeDB(codec)
mgr := Manager{
stopper: stop.NewStopper(),
leaseManager: &fakeLeaseManager{},
Expand Down Expand Up @@ -310,7 +333,7 @@ func TestClusterWideMigrationOnlyRunBySystemTenant(t *testing.T) {
func TestDBErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
codec := keys.SystemSQLCodec
db := &fakeDB{codec: codec}
db := newFakeDB(codec)
mgr := Manager{
stopper: stop.NewStopper(),
leaseManager: &fakeLeaseManager{},
Expand Down Expand Up @@ -373,7 +396,7 @@ func TestDBErrors(t *testing.T) {
func TestLeaseErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
codec := keys.SystemSQLCodec
db := &fakeDB{codec: codec, kvs: make(map[string][]byte)}
db := newFakeDB(codec)
mgr := Manager{
stopper: stop.NewStopper(),
leaseManager: &fakeLeaseManager{
Expand Down Expand Up @@ -405,7 +428,7 @@ func TestLeaseErrors(t *testing.T) {
func TestLeaseExpiration(t *testing.T) {
defer leaktest.AfterTest(t)()
codec := keys.SystemSQLCodec
db := &fakeDB{codec: codec, kvs: make(map[string][]byte)}
db := newFakeDB(codec)
mgr := Manager{
stopper: stop.NewStopper(),
leaseManager: &fakeLeaseManager{leaseTimeRemaining: time.Nanosecond},
Expand Down Expand Up @@ -520,7 +543,7 @@ func (mt *migrationTest) runMigration(ctx context.Context, m migrationDescriptor
return m.workFn(ctx, runner{
settings: mt.server.ClusterSettings(),
codec: keys.SystemSQLCodec,
db: mt.kvDB,
db: dbAdapter{DB: mt.kvDB},
sqlExecutor: mt.server.InternalExecutor().(*sql.InternalExecutor),
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ func newMVCCIterator(
func MVCCGet(
ctx context.Context, reader Reader, key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions,
) (*roachpb.Value, *roachpb.Intent, error) {
iter := newMVCCIterator(reader, timestamp, !opts.Tombstones, IterOptions{
iter := newMVCCIterator(reader, timestamp, false /* rangeKeyMasking */, IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
Prefix: true,
})
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/pebble_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ func (p *pebbleIterator) setOptions(opts IterOptions, durability DurabilityRequi
if opts.MinTimestampHint.IsSet() && opts.MaxTimestampHint.IsEmpty() {
panic("min timestamp hint set without max timestamp hint")
}
if opts.Prefix && opts.RangeKeyMaskingBelow.IsSet() {
panic("can't use range key masking with prefix iterators") // very high overhead
}

// If this Pebble database does not support range keys yet, fall back to
// only iterating over point keys to avoid panics. This is effectively the
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/pebble_mvcc_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,15 +440,16 @@ func (p *pebbleMVCCScanner) init(
func (p *pebbleMVCCScanner) get(ctx context.Context) {
p.isGet = true

// The iterator may already be positioned on a range key, in which
p.parent.SeekGE(MVCCKey{Key: p.start})

// The iterator may already have been positioned on a range key, in which
// case RangeKeyChanged() wouldn't fire.
if ok, _ := p.parent.Valid(); ok {
if _, hasRange := p.parent.HasPointAndRange(); hasRange {
p.enablePointSynthesis()
}
}

p.parent.SeekGE(MVCCKey{Key: p.start})
if !p.updateCurrent() {
return
}
Expand Down
Loading

0 comments on commit 5527486

Please sign in to comment.