diff --git a/pkg/startupmigrations/BUILD.bazel b/pkg/startupmigrations/BUILD.bazel index e1388913b916..bf09ceb47ea7 100644 --- a/pkg/startupmigrations/BUILD.bazel +++ b/pkg/startupmigrations/BUILD.bazel @@ -58,6 +58,7 @@ go_test( "//pkg/testutils/sqlutils", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/randutil", "//pkg/util/stop", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", diff --git a/pkg/startupmigrations/migrations.go b/pkg/startupmigrations/migrations.go index 6c1307f37460..52e634e958fe 100644 --- a/pkg/startupmigrations/migrations.go +++ b/pkg/startupmigrations/migrations.go @@ -419,6 +419,10 @@ type DB interface { Get(ctx context.Context, key interface{}) (kv.KeyValue, error) Put(ctx context.Context, key, value interface{}) error Txn(ctx context.Context, retryable func(ctx context.Context, txn *kv.Txn) error) error + + // ReadCommittedScan is like Scan but may return an inconsistent and stale + // snapshot. + ReadCommittedScan(ctx context.Context, begin, end interface{}, maxRows int64) ([]kv.KeyValue, error) } // Manager encapsulates the necessary functionality for handling migrations @@ -453,7 +457,7 @@ func NewManager( return &Manager{ stopper: stopper, leaseManager: leasemanager.New(db, clock, opts), - db: db, + db: dbAdapter{DB: db}, codec: codec, sqlExecutor: executor, testingKnobs: testingKnobs, @@ -462,6 +466,24 @@ func NewManager( } } +// dbAdapter augments the kv.DB with a ReadCommittedScan method as required +// by the DB interface. +type dbAdapter struct { + *kv.DB +} + +func (d dbAdapter) ReadCommittedScan( + ctx context.Context, begin, end interface{}, maxRows int64, +) ([]kv.KeyValue, error) { + var b kv.Batch + b.Header.ReadConsistency = roachpb.INCONSISTENT + b.Scan(begin, end) + if err := d.Run(ctx, &b); err != nil { + return nil, err + } + return b.Results[0].Rows, nil +} + // EnsureMigrations should be run during node startup to ensure that all // required migrations have been run (and running all those that are definitely // safe to run). @@ -470,27 +492,18 @@ func (m *Manager) EnsureMigrations(ctx context.Context, bootstrapVersion roachpb defer m.testingKnobs.AfterEnsureMigrations() } // First, check whether there are any migrations that need to be run. - completedMigrations, err := getCompletedMigrations(ctx, m.db, m.codec) - if err != nil { + // We do the check potentially twice, once with a readCommittedScan which + // might read stale values, but can be performed locally, and then, if + // there are migrations to run, again with a consistent scan. + if allComplete, err := m.checkIfAllMigrationsAreComplete( + ctx, bootstrapVersion, m.db.ReadCommittedScan, + ); err != nil || allComplete { return err } - allMigrationsCompleted := true - for _, migration := range backwardCompatibleMigrations { - if !m.shouldRunMigration(migration, bootstrapVersion) { - continue - } - if m.testingKnobs.DisableBackfillMigrations && migration.doesBackfill { - log.Infof(ctx, "ignoring migrations after (and including) %s due to testing knob", - migration.name) - break - } - key := migrationKey(m.codec, migration) - if _, ok := completedMigrations[string(key)]; !ok { - allMigrationsCompleted = false - } - } - if allMigrationsCompleted { - return nil + if allComplete, err := m.checkIfAllMigrationsAreComplete( + ctx, bootstrapVersion, m.db.Scan, + ); err != nil || allComplete { + return err } // If there are any, grab the migration lease to ensure that only one @@ -502,6 +515,7 @@ func (m *Manager) EnsureMigrations(ctx context.Context, bootstrapVersion roachpb if log.V(1) { log.Info(ctx, "trying to acquire lease") } + var err error for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { lease, err = m.leaseManager.AcquireLease(ctx, m.codec.StartupMigrationLeaseKey()) if err == nil { @@ -556,7 +570,7 @@ func (m *Manager) EnsureMigrations(ctx context.Context, bootstrapVersion roachpb // Re-get the list of migrations in case any of them were completed between // our initial check and our grabbing of the lease. - completedMigrations, err = getCompletedMigrations(ctx, m.db, m.codec) + completedMigrations, err := getCompletedMigrations(ctx, m.db.Scan, m.codec) if err != nil { return err } @@ -601,6 +615,31 @@ func (m *Manager) EnsureMigrations(ctx context.Context, bootstrapVersion roachpb return nil } +func (m *Manager) checkIfAllMigrationsAreComplete( + ctx context.Context, bootstrapVersion roachpb.Version, scan scanFunc, +) (completedAll bool, _ error) { + completedMigrations, err := getCompletedMigrations(ctx, scan, m.codec) + if err != nil { + return false, err + } + allMigrationsCompleted := true + for _, migration := range backwardCompatibleMigrations { + if !m.shouldRunMigration(migration, bootstrapVersion) { + continue + } + if m.testingKnobs.DisableBackfillMigrations && migration.doesBackfill { + log.Infof(ctx, "ignoring migrations after (and including) %s due to testing knob", + migration.name) + break + } + key := migrationKey(m.codec, migration) + if _, ok := completedMigrations[string(key)]; !ok { + allMigrationsCompleted = false + } + } + return allMigrationsCompleted, nil +} + func (m *Manager) shouldRunMigration( migration migrationDescriptor, bootstrapVersion roachpb.Version, ) bool { @@ -621,14 +660,16 @@ func (m *Manager) shouldRunMigration( return true } +type scanFunc = func(_ context.Context, from, to interface{}, maxRows int64) ([]kv.KeyValue, error) + func getCompletedMigrations( - ctx context.Context, db DB, codec keys.SQLCodec, + ctx context.Context, scan scanFunc, codec keys.SQLCodec, ) (map[string]struct{}, error) { if log.V(1) { log.Info(ctx, "trying to get the list of completed migrations") } prefix := codec.StartupMigrationKeyPrefix() - keyvals, err := db.Scan(ctx, prefix, prefix.PrefixEnd(), 0 /* maxRows */) + keyvals, err := scan(ctx, prefix, prefix.PrefixEnd(), 0 /* maxRows */) if err != nil { return nil, errors.Wrapf(err, "failed to get list of completed migrations") } diff --git a/pkg/startupmigrations/migrations_test.go b/pkg/startupmigrations/migrations_test.go index 26dd6dc10d57..9771aaaab919 100644 --- a/pkg/startupmigrations/migrations_test.go +++ b/pkg/startupmigrations/migrations_test.go @@ -14,6 +14,7 @@ import ( "bytes" "context" "fmt" + "math/rand" "strings" "testing" "time" @@ -31,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -85,11 +87,32 @@ func (f *fakeLeaseManager) TimeRemaining(l *leasemanager.Lease) time.Duration { type fakeDB struct { codec keys.SQLCodec + rand *rand.Rand kvs map[string][]byte scanErr error putErr error } +func newFakeDB(codec keys.SQLCodec) *fakeDB { + r, _ := randutil.NewTestRand() + return &fakeDB{ + codec: codec, + rand: r, + kvs: make(map[string][]byte), + } +} + +// ReadCommittedScan never returns any data. +func (f *fakeDB) ReadCommittedScan( + ctx context.Context, begin, end interface{}, maxRows int64, +) ([]kv.KeyValue, error) { + // Sometimes return the data, sometimes return nothing. + if f.rand.Float64() < .9 { + return f.Scan(ctx, begin, end, maxRows) + } + return nil, nil +} + func (f *fakeDB) Scan( ctx context.Context, begin, end interface{}, maxRows int64, ) ([]kv.KeyValue, error) { @@ -135,7 +158,7 @@ func (f *fakeDB) Txn(context.Context, func(context.Context, *kv.Txn) error) erro func TestEnsureMigrations(t *testing.T) { defer leaktest.AfterTest(t)() codec := keys.SystemSQLCodec - db := &fakeDB{codec: codec} + db := newFakeDB(codec) mgr := Manager{ stopper: stop.NewStopper(), leaseManager: &fakeLeaseManager{}, @@ -237,7 +260,7 @@ func TestSkipMigrationsIncludedInBootstrap(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() codec := keys.SystemSQLCodec - db := &fakeDB{codec: codec} + db := newFakeDB(codec) mgr := Manager{ stopper: stop.NewStopper(), leaseManager: &fakeLeaseManager{}, @@ -280,7 +303,7 @@ func TestClusterWideMigrationOnlyRunBySystemTenant(t *testing.T) { } ctx := context.Background() - db := &fakeDB{codec: codec} + db := newFakeDB(codec) mgr := Manager{ stopper: stop.NewStopper(), leaseManager: &fakeLeaseManager{}, @@ -310,7 +333,7 @@ func TestClusterWideMigrationOnlyRunBySystemTenant(t *testing.T) { func TestDBErrors(t *testing.T) { defer leaktest.AfterTest(t)() codec := keys.SystemSQLCodec - db := &fakeDB{codec: codec} + db := newFakeDB(codec) mgr := Manager{ stopper: stop.NewStopper(), leaseManager: &fakeLeaseManager{}, @@ -373,7 +396,7 @@ func TestDBErrors(t *testing.T) { func TestLeaseErrors(t *testing.T) { defer leaktest.AfterTest(t)() codec := keys.SystemSQLCodec - db := &fakeDB{codec: codec, kvs: make(map[string][]byte)} + db := newFakeDB(codec) mgr := Manager{ stopper: stop.NewStopper(), leaseManager: &fakeLeaseManager{ @@ -405,7 +428,7 @@ func TestLeaseErrors(t *testing.T) { func TestLeaseExpiration(t *testing.T) { defer leaktest.AfterTest(t)() codec := keys.SystemSQLCodec - db := &fakeDB{codec: codec, kvs: make(map[string][]byte)} + db := newFakeDB(codec) mgr := Manager{ stopper: stop.NewStopper(), leaseManager: &fakeLeaseManager{leaseTimeRemaining: time.Nanosecond}, @@ -520,7 +543,7 @@ func (mt *migrationTest) runMigration(ctx context.Context, m migrationDescriptor return m.workFn(ctx, runner{ settings: mt.server.ClusterSettings(), codec: keys.SystemSQLCodec, - db: mt.kvDB, + db: dbAdapter{DB: mt.kvDB}, sqlExecutor: mt.server.InternalExecutor().(*sql.InternalExecutor), }) }