From ccebe882c7b9bdd1674e11d04b69071a67b026ed Mon Sep 17 00:00:00 2001 From: Abu Sayed <82162518+sayedppqq@users.noreply.github.com> Date: Mon, 6 May 2024 14:30:57 +0600 Subject: [PATCH] Add additional file path for Local provider for MongoDB Archiver (#26) Signed-off-by: sayedppqq --- Makefile | 6 +++ cmd/mongo/oplog_push.go | 16 +++++++- cmd/mongo/oplog_replay.go | 41 +++++++++---------- internal/config.go | 2 + internal/configure.go | 5 +++ .../databases/mysql/binlog_fetch_handler.go | 4 +- .../databases/mysql/binlog_replay_handler.go | 2 +- 7 files changed, 50 insertions(+), 26 deletions(-) diff --git a/Makefile b/Makefile index eb2a4ffb6..a3cf3f922 100644 --- a/Makefile +++ b/Makefile @@ -285,3 +285,9 @@ unlink_libsodium: build_client: cd cmd/daemonclient && \ go build -o ../../bin/walg-daemon-client -ldflags "-s -w -X main.buildDate=`date -u +%Y.%m.%d_%H:%M:%S` -X main.gitRevision=`git rev-parse --short HEAD` -X main.version=`git tag -l --points-at HEAD`" + + +update: mongo_build + docker build --tag walg:2.0 /home/sayed/go/src/kubedb.dev/wal-g/main/mongo + docker tag walg:2.0 sayedppqq/walg:2.0 + docker push sayedppqq/walg:2.0 \ No newline at end of file diff --git a/cmd/mongo/oplog_push.go b/cmd/mongo/oplog_push.go index add448ddf..fe8a7205d 100644 --- a/cmd/mongo/oplog_push.go +++ b/cmd/mongo/oplog_push.go @@ -2,7 +2,9 @@ package mongo import ( "context" + storageapi "kubestash.dev/apimachinery/apis/storage/v1alpha1" "os" + "path" "syscall" "time" @@ -73,7 +75,11 @@ func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplo if err != nil { return err } - uplProvider.ChangeDirectory(models.OplogArchBasePath) + subDir := models.OplogArchBasePath + if pushArgs.dbProvider == string(storageapi.ProviderLocal) { + subDir = path.Join(pushArgs.dbPath, subDir) + } + uplProvider.ChangeDirectory(subDir) uploader := archive.NewStorageUploader(uplProvider) uploader.SetKubeClient(pushArgs.kubeClient) uploader.SetSnapshot(snapshotName, snapshotNamespace) @@ -146,6 +152,8 @@ type oplogPushRunArgs struct { primaryWaitTimeout time.Duration lwUpdate time.Duration kubeClient controllerclient.Client + dbProvider string + dbPath string } func buildOplogPushRunArgs() (args oplogPushRunArgs, err error) { @@ -164,6 +172,10 @@ func buildOplogPushRunArgs() (args oplogPushRunArgs, err error) { return } + args.dbProvider = internal.GetNonRequiredSetting(internal.MongoDBProvider) + + args.dbPath = internal.GetNonRequiredSetting(internal.MongoDBPath) + args.primaryWait, err = internal.GetBoolSettingDefault(internal.OplogPushWaitForBecomePrimary, false) if err != nil { return @@ -191,7 +203,7 @@ func buildOplogPushRunArgs() (args oplogPushRunArgs, err error) { return } - return + return args, err } type oplogPushStatsArgs struct { diff --git a/cmd/mongo/oplog_replay.go b/cmd/mongo/oplog_replay.go index 5fca41ea8..8784fddc5 100644 --- a/cmd/mongo/oplog_replay.go +++ b/cmd/mongo/oplog_replay.go @@ -3,9 +3,6 @@ package mongo import ( "context" "encoding/json" - "os" - "syscall" - "github.com/spf13/cobra" "github.com/wal-g/tracelog" "github.com/wal-g/wal-g/internal" @@ -16,6 +13,8 @@ import ( "github.com/wal-g/wal-g/internal/databases/mongo/oplog" "github.com/wal-g/wal-g/internal/databases/mongo/stages" "github.com/wal-g/wal-g/utility" + "os" + "syscall" ) const LatestBackupString = "LATEST_BACKUP" @@ -91,24 +90,24 @@ func buildOplogReplayRunArgs(cmdargs []string) (args oplogReplayRunArgs, err err return args, nil } -func processArg(arg string, downloader *archive.StorageDownloader) (models.Timestamp, error) { - switch arg { - case internal.LatestString: - return downloader.LastKnownArchiveTS() - case LatestBackupString: - lastBackupName, err := downloader.LastBackupName() - if err != nil { - return models.Timestamp{}, err - } - backupMeta, err := downloader.BackupMeta(lastBackupName) - if err != nil { - return models.Timestamp{}, err - } - return models.TimestampFromBson(backupMeta.MongoMeta.BackupLastTS), nil - default: - return models.TimestampFromStr(arg) - } -} +//func processArg(arg string, downloader *archive.StorageDownloader) (models.Timestamp, error) { +// switch arg { +// case internal.LatestString: +// return downloader.LastKnownArchiveTS() +// case LatestBackupString: +// lastBackupName, err := downloader.LastBackupName() +// if err != nil { +// return models.Timestamp{}, err +// } +// backupMeta, err := downloader.BackupMeta(lastBackupName) +// if err != nil { +// return models.Timestamp{}, err +// } +// return models.TimestampFromBson(backupMeta.MongoMeta.BackupLastTS), nil +// default: +// return models.TimestampFromStr(arg) +// } +//} func runOplogReplay(ctx context.Context, replayArgs oplogReplayRunArgs) error { tracelog.DebugLogger.Printf("starting replay with arguments: %+v", replayArgs) diff --git a/internal/config.go b/internal/config.go index fca5f1848..f2dc7ac48 100644 --- a/internal/config.go +++ b/internal/config.go @@ -109,6 +109,8 @@ const ( ProfileMode = "PROFILE_MODE" ProfilePath = "PROFILE_PATH" + MongoDBProvider = "MONGODB_PROVIDER" + MongoDBPath = "MONGODB_PATH" MongoDBUriSetting = "MONGODB_URI" MongoDBLastWriteUpdateInterval = "MONGODB_LAST_WRITE_UPDATE_INTERVAL" MongoDBRestoreDisableHostResetup = "MONGODB_RESTORE_DISABLE_HOST_RESETUP" diff --git a/internal/configure.go b/internal/configure.go index 926641c7a..a30127eab 100644 --- a/internal/configure.go +++ b/internal/configure.go @@ -523,6 +523,11 @@ func GetRequiredSetting(setting string) (string, error) { return val, nil } +func GetNonRequiredSetting(setting string) string { + val, _ := GetSetting(setting) + return val +} + func GetBoolSettingDefault(setting string, def bool) (bool, error) { val, ok := GetSetting(setting) if !ok { diff --git a/internal/databases/mysql/binlog_fetch_handler.go b/internal/databases/mysql/binlog_fetch_handler.go index 3bb9ef462..19baf8c6a 100644 --- a/internal/databases/mysql/binlog_fetch_handler.go +++ b/internal/databases/mysql/binlog_fetch_handler.go @@ -47,11 +47,11 @@ func HandleBinlogFetch(folder storage.Folder, backupName string, untilTS string, tracelog.ErrorLogger.FatalOnError(err) var startTS, endTS, endBinlogTS time.Time if skipStartTime { - startTS, endTS, endBinlogTS, err = getEndTimestamps(folder, untilTS, untilBinlogLastModifiedTS) + startTS, endTS, endBinlogTS, err = getEndTimestamps(untilTS, untilBinlogLastModifiedTS) } else { startTS, endTS, endBinlogTS, err = getTimestamps(folder, backupName, untilTS, untilBinlogLastModifiedTS) - tracelog.ErrorLogger.FatalOnError(err) } + tracelog.ErrorLogger.FatalOnError(err) handler := newIndexHandler(dstDir) diff --git a/internal/databases/mysql/binlog_replay_handler.go b/internal/databases/mysql/binlog_replay_handler.go index 118c5e2fd..c8ea17008 100644 --- a/internal/databases/mysql/binlog_replay_handler.go +++ b/internal/databases/mysql/binlog_replay_handler.go @@ -112,7 +112,7 @@ func getTimestamps(folder storage.Folder, backupName, untilTS, untilBinlogLastMo return startTS, endTS, endBinlogTS, nil } -func getEndTimestamps(folder storage.Folder, untilTS, untilBinlogLastModifiedTS string) (time.Time, time.Time, time.Time, error) { +func getEndTimestamps(untilTS, untilBinlogLastModifiedTS string) (time.Time, time.Time, time.Time, error) { endTS, err := utility.ParseUntilTS(untilTS) if err != nil { return time.Time{}, time.Time{}, time.Time{}, err