Skip to content

Commit

Permalink
startupmigrations: check for migration completion more cheaply
Browse files Browse the repository at this point in the history
INCONSISTENT reads can be performed on followers. Most always these
startup migrations are complete. We can avoid a wide-area round-trip by
checking optimistically with a follower before doing a consistent read.

Release note: None
  • Loading branch information
ajwerner committed Aug 15, 2022
1 parent 7511099 commit 53e4c11
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 30 deletions.
1 change: 1 addition & 0 deletions pkg/startupmigrations/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ go_test(
"//pkg/testutils/sqlutils",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/stop",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
Expand Down
87 changes: 64 additions & 23 deletions pkg/startupmigrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,10 @@ type DB interface {
Get(ctx context.Context, key interface{}) (kv.KeyValue, error)
Put(ctx context.Context, key, value interface{}) error
Txn(ctx context.Context, retryable func(ctx context.Context, txn *kv.Txn) error) error

// ReadCommittedScan is like Scan but may return an inconsistent and stale
// snapshot.
ReadCommittedScan(ctx context.Context, begin, end interface{}, maxRows int64) ([]kv.KeyValue, error)
}

// Manager encapsulates the necessary functionality for handling migrations
Expand Down Expand Up @@ -453,7 +457,7 @@ func NewManager(
return &Manager{
stopper: stopper,
leaseManager: leasemanager.New(db, clock, opts),
db: db,
db: dbAdapter{DB: db},
codec: codec,
sqlExecutor: executor,
testingKnobs: testingKnobs,
Expand All @@ -462,6 +466,24 @@ func NewManager(
}
}

// dbAdapter augments the kv.DB with a ReadCommittedScan method as required
// by the DB interface.
type dbAdapter struct {
*kv.DB
}

func (d dbAdapter) ReadCommittedScan(
ctx context.Context, begin, end interface{}, maxRows int64,
) ([]kv.KeyValue, error) {
var b kv.Batch
b.Header.ReadConsistency = roachpb.INCONSISTENT
b.Scan(begin, end)
if err := d.Run(ctx, &b); err != nil {
return nil, err
}
return b.Results[0].Rows, nil
}

// EnsureMigrations should be run during node startup to ensure that all
// required migrations have been run (and running all those that are definitely
// safe to run).
Expand All @@ -470,27 +492,18 @@ func (m *Manager) EnsureMigrations(ctx context.Context, bootstrapVersion roachpb
defer m.testingKnobs.AfterEnsureMigrations()
}
// First, check whether there are any migrations that need to be run.
completedMigrations, err := getCompletedMigrations(ctx, m.db, m.codec)
if err != nil {
// We do the check potentially twice, once with a readCommittedScan which
// might read stale values, but can be performed locally, and then, if
// there are migrations to run, again with a consistent scan.
if allComplete, err := m.checkIfAllMigrationsAreComplete(
ctx, bootstrapVersion, m.db.ReadCommittedScan,
); err != nil || allComplete {
return err
}
allMigrationsCompleted := true
for _, migration := range backwardCompatibleMigrations {
if !m.shouldRunMigration(migration, bootstrapVersion) {
continue
}
if m.testingKnobs.DisableBackfillMigrations && migration.doesBackfill {
log.Infof(ctx, "ignoring migrations after (and including) %s due to testing knob",
migration.name)
break
}
key := migrationKey(m.codec, migration)
if _, ok := completedMigrations[string(key)]; !ok {
allMigrationsCompleted = false
}
}
if allMigrationsCompleted {
return nil
if allComplete, err := m.checkIfAllMigrationsAreComplete(
ctx, bootstrapVersion, m.db.Scan,
); err != nil || allComplete {
return err
}

// If there are any, grab the migration lease to ensure that only one
Expand All @@ -502,6 +515,7 @@ func (m *Manager) EnsureMigrations(ctx context.Context, bootstrapVersion roachpb
if log.V(1) {
log.Info(ctx, "trying to acquire lease")
}
var err error
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
lease, err = m.leaseManager.AcquireLease(ctx, m.codec.StartupMigrationLeaseKey())
if err == nil {
Expand Down Expand Up @@ -556,7 +570,7 @@ func (m *Manager) EnsureMigrations(ctx context.Context, bootstrapVersion roachpb

// Re-get the list of migrations in case any of them were completed between
// our initial check and our grabbing of the lease.
completedMigrations, err = getCompletedMigrations(ctx, m.db, m.codec)
completedMigrations, err := getCompletedMigrations(ctx, m.db.Scan, m.codec)
if err != nil {
return err
}
Expand Down Expand Up @@ -601,6 +615,31 @@ func (m *Manager) EnsureMigrations(ctx context.Context, bootstrapVersion roachpb
return nil
}

func (m *Manager) checkIfAllMigrationsAreComplete(
ctx context.Context, bootstrapVersion roachpb.Version, scan scanFunc,
) (completedAll bool, _ error) {
completedMigrations, err := getCompletedMigrations(ctx, scan, m.codec)
if err != nil {
return false, err
}
allMigrationsCompleted := true
for _, migration := range backwardCompatibleMigrations {
if !m.shouldRunMigration(migration, bootstrapVersion) {
continue
}
if m.testingKnobs.DisableBackfillMigrations && migration.doesBackfill {
log.Infof(ctx, "ignoring migrations after (and including) %s due to testing knob",
migration.name)
break
}
key := migrationKey(m.codec, migration)
if _, ok := completedMigrations[string(key)]; !ok {
allMigrationsCompleted = false
}
}
return allMigrationsCompleted, nil
}

func (m *Manager) shouldRunMigration(
migration migrationDescriptor, bootstrapVersion roachpb.Version,
) bool {
Expand All @@ -621,14 +660,16 @@ func (m *Manager) shouldRunMigration(
return true
}

type scanFunc = func(_ context.Context, from, to interface{}, maxRows int64) ([]kv.KeyValue, error)

func getCompletedMigrations(
ctx context.Context, db DB, codec keys.SQLCodec,
ctx context.Context, scan scanFunc, codec keys.SQLCodec,
) (map[string]struct{}, error) {
if log.V(1) {
log.Info(ctx, "trying to get the list of completed migrations")
}
prefix := codec.StartupMigrationKeyPrefix()
keyvals, err := db.Scan(ctx, prefix, prefix.PrefixEnd(), 0 /* maxRows */)
keyvals, err := scan(ctx, prefix, prefix.PrefixEnd(), 0 /* maxRows */)
if err != nil {
return nil, errors.Wrapf(err, "failed to get list of completed migrations")
}
Expand Down
37 changes: 30 additions & 7 deletions pkg/startupmigrations/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"fmt"
"math/rand"
"strings"
"testing"
"time"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -85,11 +87,32 @@ func (f *fakeLeaseManager) TimeRemaining(l *leasemanager.Lease) time.Duration {

type fakeDB struct {
codec keys.SQLCodec
rand *rand.Rand
kvs map[string][]byte
scanErr error
putErr error
}

func newFakeDB(codec keys.SQLCodec) *fakeDB {
r, _ := randutil.NewTestRand()
return &fakeDB{
codec: codec,
rand: r,
kvs: make(map[string][]byte),
}
}

// ReadCommittedScan never returns any data.
func (f *fakeDB) ReadCommittedScan(
ctx context.Context, begin, end interface{}, maxRows int64,
) ([]kv.KeyValue, error) {
// Sometimes return the data, sometimes return nothing.
if f.rand.Float64() < .9 {
return f.Scan(ctx, begin, end, maxRows)
}
return nil, nil
}

func (f *fakeDB) Scan(
ctx context.Context, begin, end interface{}, maxRows int64,
) ([]kv.KeyValue, error) {
Expand Down Expand Up @@ -135,7 +158,7 @@ func (f *fakeDB) Txn(context.Context, func(context.Context, *kv.Txn) error) erro
func TestEnsureMigrations(t *testing.T) {
defer leaktest.AfterTest(t)()
codec := keys.SystemSQLCodec
db := &fakeDB{codec: codec}
db := newFakeDB(codec)
mgr := Manager{
stopper: stop.NewStopper(),
leaseManager: &fakeLeaseManager{},
Expand Down Expand Up @@ -237,7 +260,7 @@ func TestSkipMigrationsIncludedInBootstrap(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
codec := keys.SystemSQLCodec
db := &fakeDB{codec: codec}
db := newFakeDB(codec)
mgr := Manager{
stopper: stop.NewStopper(),
leaseManager: &fakeLeaseManager{},
Expand Down Expand Up @@ -280,7 +303,7 @@ func TestClusterWideMigrationOnlyRunBySystemTenant(t *testing.T) {
}

ctx := context.Background()
db := &fakeDB{codec: codec}
db := newFakeDB(codec)
mgr := Manager{
stopper: stop.NewStopper(),
leaseManager: &fakeLeaseManager{},
Expand Down Expand Up @@ -310,7 +333,7 @@ func TestClusterWideMigrationOnlyRunBySystemTenant(t *testing.T) {
func TestDBErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
codec := keys.SystemSQLCodec
db := &fakeDB{codec: codec}
db := newFakeDB(codec)
mgr := Manager{
stopper: stop.NewStopper(),
leaseManager: &fakeLeaseManager{},
Expand Down Expand Up @@ -373,7 +396,7 @@ func TestDBErrors(t *testing.T) {
func TestLeaseErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
codec := keys.SystemSQLCodec
db := &fakeDB{codec: codec, kvs: make(map[string][]byte)}
db := newFakeDB(codec)
mgr := Manager{
stopper: stop.NewStopper(),
leaseManager: &fakeLeaseManager{
Expand Down Expand Up @@ -405,7 +428,7 @@ func TestLeaseErrors(t *testing.T) {
func TestLeaseExpiration(t *testing.T) {
defer leaktest.AfterTest(t)()
codec := keys.SystemSQLCodec
db := &fakeDB{codec: codec, kvs: make(map[string][]byte)}
db := newFakeDB(codec)
mgr := Manager{
stopper: stop.NewStopper(),
leaseManager: &fakeLeaseManager{leaseTimeRemaining: time.Nanosecond},
Expand Down Expand Up @@ -520,7 +543,7 @@ func (mt *migrationTest) runMigration(ctx context.Context, m migrationDescriptor
return m.workFn(ctx, runner{
settings: mt.server.ClusterSettings(),
codec: keys.SystemSQLCodec,
db: mt.kvDB,
db: dbAdapter{DB: mt.kvDB},
sqlExecutor: mt.server.InternalExecutor().(*sql.InternalExecutor),
})
}
Expand Down

0 comments on commit 53e4c11

Please sign in to comment.