Skip to content

Commit

Permalink
Fix pg_control check for WAL-E backups (wal-g#169)
Browse files Browse the repository at this point in the history
* Fix pg_control check for WAL-E backups

Currently, WAL-E backup restoration was broken since preprelease 0.2.1+
We were checking for pg_control before understanding that it is not necessary. This commit fixes it.
  • Loading branch information
x4m authored and Tinsane committed Jan 22, 2019
1 parent 56ac301 commit 14a4d5f
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 14 deletions.
26 changes: 18 additions & 8 deletions internal/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (backup *Backup) GetTarNames() ([]string, error) {
}

// TODO : unit tests
func (backup *Backup) fetchSentinel() (BackupSentinelDto, error) {
func (backup *Backup) FetchSentinel() (BackupSentinelDto, error) {
sentinelDto := BackupSentinelDto{}
backupReaderMaker := NewStorageReaderMaker(backup.BaseBackupFolder, backup.getStopSentinelPath())
backupReader, err := backupReaderMaker.Reader()
Expand Down Expand Up @@ -120,14 +120,20 @@ func (backup *Backup) unwrap(dbDataDirectory string, sentinelDto BackupSentinelD
if err != nil {
return err
}

// Check name for backwards compatibility. Will check for `pg_control` if WALG version of backup.
needPgControl := IsPgControlRequired(backup, sentinelDto)

if pgControlKey == "" && needPgControl {
return NewPgControlNotFoundError()
}

err = ExtractAll(tarInterpreter, tarsToExtract)
if err != nil {
return err
}
// Check name for backwards compatibility. Will check for `pg_control` if WALG version of backup.
re := regexp.MustCompile(`^([^_]+._{1}[^_]+._{1})`)
match := re.FindString(backup.Name)
if match == "" || sentinelDto.isIncremental() {

if needPgControl {
err = ExtractAll(tarInterpreter, []ReaderMaker{NewStorageReaderMaker(backup.getTarPartitionFolder(), pgControlKey)})
if err != nil {
return errors.Wrap(err, "failed to extract pg_control")
Expand All @@ -138,6 +144,13 @@ func (backup *Backup) unwrap(dbDataDirectory string, sentinelDto BackupSentinelD
return nil
}

func IsPgControlRequired(backup *Backup, sentinelDto BackupSentinelDto) bool {
re := regexp.MustCompile(`^([^_]+._{1}[^_]+._{1})`)
walgBasebackupName := re.FindString(backup.Name) == ""
needPgControl := walgBasebackupName || sentinelDto.isIncremental()
return needPgControl
}

// TODO : unit tests
func IsDirectoryEmpty(directoryPath string) (bool, error) {
var isEmpty = true
Expand Down Expand Up @@ -178,8 +191,5 @@ func (backup *Backup) getTarsToExtract() (tarsToExtract []ReaderMaker, pgControl
tarToExtract := NewStorageReaderMaker(backup.getTarPartitionFolder(), tarName)
tarsToExtract = append(tarsToExtract, tarToExtract)
}
if pgControlKey == "" {
return nil, "", NewPgControlNotFoundError()
}
return
}
2 changes: 1 addition & 1 deletion internal/backup_fetch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func deltaFetchRecursion(backupName string, folder StorageFolder, dbDataDirector
if err != nil {
return err
}
sentinelDto, err := backup.fetchSentinel()
sentinelDto, err := backup.FetchSentinel()
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/backup_push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func HandleBackupPush(archiveDirectory string, uploader *Uploader) {
}
} else {
previousBackup := NewBackup(basebackupFolder, previousBackupName)
previousBackupSentinelDto, err = previousBackup.fetchSentinel()
previousBackupSentinelDto, err = previousBackup.FetchSentinel()
if err != nil {
tracelog.ErrorLogger.FatalError(err)
}
Expand All @@ -72,7 +72,7 @@ func HandleBackupPush(archiveDirectory string, uploader *Uploader) {
tracelog.InfoLogger.Println("Delta will be made from full backup.")
previousBackupName = *previousBackupSentinelDto.IncrementFullName
previousBackup := NewBackup(basebackupFolder, previousBackupName)
previousBackupSentinelDto, err = previousBackup.fetchSentinel()
previousBackupSentinelDto, err = previousBackup.FetchSentinel()
if err != nil {
tracelog.ErrorLogger.FatalError(err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/delete_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type DeleteCommandArguments struct {

// TODO : unit tests
func adjustDeleteTarget(target *Backup, findFull bool) *Backup {
sentinelDto, err := target.fetchSentinel()
sentinelDto, err := target.FetchSentinel()
if err != nil {
tracelog.ErrorLogger.FatalError(err)
}
Expand Down Expand Up @@ -87,7 +87,7 @@ func HandleDelete(folder StorageFolder, args []string) {
return
}
backup := NewBackup(baseBackupFolder, b.BackupName)
dto, err := backup.fetchSentinel()
dto, err := backup.FetchSentinel()
if err != nil {
tracelog.ErrorLogger.FatalError(err)
}
Expand Down
3 changes: 2 additions & 1 deletion test/backup_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"github.com/stretchr/testify/assert"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/testtools"
"strings"
"testing"
)

func createMockStorageFolder() internal.StorageFolder {
var folder = testtools.MakeDefaultInMemoryStorageFolder()
subFolder := folder.GetSubFolder(internal.BaseBackupPath)
subFolder.PutObject("base_123_backup_stop_sentinel.json", &bytes.Buffer{})
subFolder.PutObject("base_456_backup_stop_sentinel.json", &bytes.Buffer{})
subFolder.PutObject("base_456_backup_stop_sentinel.json", strings.NewReader("{}"))
subFolder.PutObject("base_000_backup_stop_sentinel.json", &bytes.Buffer{}) // last put
subFolder.PutObject("base_123312", &bytes.Buffer{}) // not a sentinel
subFolder.PutObject("base_321/nop", &bytes.Buffer{})
Expand Down
14 changes: 14 additions & 0 deletions test/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,17 @@ func TestGetTarNames(t *testing.T) {
assert.NoError(t, err)
assert.ElementsMatch(t, []string{"1", "2", "3"}, tarNames)
}

func TestIsPgControlRequired(t *testing.T) {
folder := createMockStorageFolder()
backup := internal.NewBackup(folder.GetSubFolder(internal.BaseBackupPath), "base_456")
dto, err := backup.FetchSentinel()
assert.NoError(t, err)
assert.True(t, internal.IsPgControlRequired(backup, dto))
}

func TestIsPgControlNotRequiredForWALEBackups(t *testing.T) {
folder := createMockStorageFolder()
backup := internal.NewBackup(folder.GetSubFolder(internal.BaseBackupPath), "base_000000010000DD170000000C_00743784")
assert.False(t, internal.IsPgControlRequired(backup, internal.BackupSentinelDto{}))
}

0 comments on commit 14a4d5f

Please sign in to comment.