From 5a37a4fd13e215c3e4a406d8c1e0e25c1b06f37d Mon Sep 17 00:00:00 2001 From: Andreas Palm Date: Sun, 17 Nov 2024 18:03:38 +0100 Subject: [PATCH 1/4] Add DriverMigrationLock interface --- pkg/dbmate/driver.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/dbmate/driver.go b/pkg/dbmate/driver.go index b468bcef..d140e93a 100644 --- a/pkg/dbmate/driver.go +++ b/pkg/dbmate/driver.go @@ -25,6 +25,12 @@ type Driver interface { QueryError(string, error) error } +type DriverMigrationLock interface { + Lock() error + Unlock() error + IsLocked() bool +} + // DriverConfig holds configuration passed to driver constructors type DriverConfig struct { DatabaseURL *url.URL From 6ef5cc93e269e2b3a0f158a9ddaa2b3b8b0190a6 Mon Sep 17 00:00:00 2001 From: Andreas Palm Date: Sun, 17 Nov 2024 18:04:01 +0100 Subject: [PATCH 2/4] Add UseMigrationLock parameter to DB struct --- pkg/dbmate/db.go | 56 +++++++++++++++++++++++++++++++++++++++++-- pkg/dbmate/db_test.go | 1 + 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/pkg/dbmate/db.go b/pkg/dbmate/db.go index 4f5c39e2..db7a0194 100644 --- a/pkg/dbmate/db.go +++ b/pkg/dbmate/db.go @@ -60,6 +60,8 @@ type DB struct { WaitInterval time.Duration // WaitTimeout specifies maximum time for connection attempts WaitTimeout time.Duration + // UseMigrationLock uses an exclusive lock while performing migrations + UseMigrationLock bool } // StatusResult represents an available migration status @@ -83,6 +85,7 @@ func New(databaseURL *url.URL) *DB { WaitBefore: false, WaitInterval: time.Second, WaitTimeout: 60 * time.Second, + UseMigrationLock: false, } } @@ -153,12 +156,36 @@ func (db *DB) Wait() error { } // CreateAndMigrate creates the database (if necessary) and runs migrations -func (db *DB) CreateAndMigrate() error { +func (db *DB) CreateAndMigrate() (returnErr error) { drv, err := db.Driver() if err != nil { return err } + // try and acquire a lock for the duration of the migration. + // this is to prevent multiple instances performing the same migration in parallel. + if db.UseMigrationLock { + drvLock, ok := drv.(DriverMigrationLock) + if !ok { + return fmt.Errorf("driver does not support the use of a migration lock") + } + + if err := drvLock.Lock(); err != nil { + return err + } + + defer func() { + err := drvLock.Unlock() + if err != nil { + if returnErr != nil { + returnErr = fmt.Errorf("failed to unlock: %v: %w", err, returnErr) + return + } + returnErr = fmt.Errorf("failed to unlock: %w", err) + } + }() + } + // create database if it does not already exist // skip this step if we cannot determine status // (e.g. user does not have list database permission) @@ -333,12 +360,37 @@ func (db *DB) openDatabaseForMigration(drv Driver) (*sql.DB, error) { } // Migrate migrates database to the latest version -func (db *DB) Migrate() error { +func (db *DB) Migrate() (returnErr error) { drv, err := db.Driver() if err != nil { return err } + if db.UseMigrationLock { + drvLock, ok := drv.(DriverMigrationLock) + if !ok { + return fmt.Errorf("driver does not support the use of a migration lock") + } + + // only try and lock if we haven't already done so, e.g called CreateAndMigrate. + if !drvLock.IsLocked() { + if err := drvLock.Lock(); err != nil { + return err + } + + defer func() { + err := drvLock.Unlock() + if err != nil { + if returnErr != nil { + returnErr = fmt.Errorf("failed to unlock: %v: %w", err, returnErr) + return + } + returnErr = fmt.Errorf("failed to unlock: %w", err) + } + }() + } + } + migrations, err := db.FindMigrations() if err != nil { return err diff --git a/pkg/dbmate/db_test.go b/pkg/dbmate/db_test.go index 25dd0780..0ef9b47f 100644 --- a/pkg/dbmate/db_test.go +++ b/pkg/dbmate/db_test.go @@ -59,6 +59,7 @@ func TestNew(t *testing.T) { require.False(t, db.WaitBefore) require.Equal(t, time.Second, db.WaitInterval) require.Equal(t, 60*time.Second, db.WaitTimeout) + require.Equal(t, false, db.UseMigrationLock) } func TestGetDriver(t *testing.T) { From 36de748c3be2450ed02093e8067e30df81ad00b0 Mon Sep 17 00:00:00 2001 From: Andreas Palm Date: Sun, 17 Nov 2024 18:04:33 +0100 Subject: [PATCH 3/4] Postgres: Implement DriverMigrationLock interface --- pkg/driver/postgres/postgres.go | 48 ++++++++++++++++++++++++++++ pkg/driver/postgres/postgres_test.go | 36 +++++++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/pkg/driver/postgres/postgres.go b/pkg/driver/postgres/postgres.go index 99f3fac7..901dbd99 100644 --- a/pkg/driver/postgres/postgres.go +++ b/pkg/driver/postgres/postgres.go @@ -2,6 +2,7 @@ package postgres import ( "bytes" + "context" "database/sql" "fmt" "io" @@ -28,6 +29,8 @@ type Driver struct { migrationsTableName string databaseURL *url.URL log io.Writer + + migrationLockTx *sql.Tx } // NewDriver initializes the driver @@ -456,3 +459,48 @@ func (drv *Driver) quotedMigrationsTableNameParts(db dbutil.Transaction) (string // if more than one part, we already have a schema return quotedNameParts[0], strings.Join(quotedNameParts[1:], "."), nil } + +const lockKey = 48372615 + +func (drv *Driver) Lock() error { + if drv.migrationLockTx != nil { + return fmt.Errorf("already locked") + } + + db, err := drv.Open() + if err != nil { + return err + } + + tx, err := db.BeginTx(context.Background(), nil) + if err != nil { + return err + } + + drv.migrationLockTx = tx + + _, err = tx.Exec("SELECT pg_advisory_xact_lock($1)", lockKey) + if err != nil { + return fmt.Errorf("failed to acquire lock: %w", err) + } + + return nil +} + +func (drv *Driver) Unlock() error { + if drv.migrationLockTx == nil { + return fmt.Errorf("not locked") + } + + if err := drv.migrationLockTx.Rollback(); err != nil { + return err + } + + drv.migrationLockTx = nil + + return nil +} + +func (drv *Driver) IsLocked() bool { + return drv.migrationLockTx != nil +} diff --git a/pkg/driver/postgres/postgres_test.go b/pkg/driver/postgres/postgres_test.go index d2dc4709..861862c1 100644 --- a/pkg/driver/postgres/postgres_test.go +++ b/pkg/driver/postgres/postgres_test.go @@ -6,6 +6,7 @@ import ( "net/url" "runtime" "testing" + "time" "github.com/amacneil/dbmate/v2/pkg/dbmate" "github.com/amacneil/dbmate/v2/pkg/dbtest" @@ -790,3 +791,38 @@ func TestPostgresMigrationsTableExists(t *testing.T) { require.Equal(t, true, exists) }) } + +func TestPostgresMigrationLock(t *testing.T) { + t.Run("lock and unlock", func(t *testing.T) { + drv := testPostgresDriver(t) + err := drv.Lock() + require.NoError(t, err) + + err = drv.Lock() + require.Error(t, err, "Should not be able to lock again without unlock") + + err = drv.Unlock() + require.NoError(t, err, "Should be able to unlock") + }) + + t.Run("lock on one instance should block lock attempt on another", func(t *testing.T) { + drv1 := testPostgresDriver(t) + err1 := drv1.Lock() + require.NoError(t, err1) + + var isUnlocked bool + go func() { + time.Sleep(10 * time.Millisecond) + err := drv1.Unlock() + require.NoError(t, err, "Should be able to unlock") + isUnlocked = true + }() + + drv2 := testPostgresDriver(t) + err2 := drv2.Lock() + require.NoError(t, err2) + require.Equal(t, true, isUnlocked) + err2 = drv2.Unlock() + require.NoError(t, err2, "Should be able to unlock") + }) +} From ae780bbdd3fadd571909e4089775d0b837d8dde7 Mon Sep 17 00:00:00 2001 From: Andreas Palm Date: Mon, 18 Nov 2024 09:16:51 +0100 Subject: [PATCH 4/4] Add 'migration-lock' flag to cli --- main.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/main.go b/main.go index 4b4c9308..081e5689 100644 --- a/main.go +++ b/main.go @@ -96,6 +96,10 @@ func NewApp() *cli.App { Usage: "timeout for --wait flag", Value: defaultDB.WaitTimeout, }, + &cli.BoolFlag{ + Name: "migration-lock", + Usage: "use a lock during the migration so other dbmate instances can not run migrations at the same time", + }, } app.Commands = []*cli.Command{ @@ -127,6 +131,7 @@ func NewApp() *cli.App { Action: action(func(db *dbmate.DB, c *cli.Context) error { db.Strict = c.Bool("strict") db.Verbose = c.Bool("verbose") + db.UseMigrationLock = c.Bool("migration-lock") return db.CreateAndMigrate() }), }, @@ -163,6 +168,7 @@ func NewApp() *cli.App { Action: action(func(db *dbmate.DB, c *cli.Context) error { db.Strict = c.Bool("strict") db.Verbose = c.Bool("verbose") + db.UseMigrationLock = c.Bool("migration-lock") return db.Migrate() }), },