Skip to content

Commit

Permalink
Add additional file path for Local provider for MongoDB Archiver (#26)
Browse files Browse the repository at this point in the history
Signed-off-by: sayedppqq <[email protected]>
  • Loading branch information
sayedppqq authored May 6, 2024
1 parent 2351b87 commit ccebe88
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 26 deletions.
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,9 @@ unlink_libsodium:
build_client:
cd cmd/daemonclient && \
go build -o ../../bin/walg-daemon-client -ldflags "-s -w -X main.buildDate=`date -u +%Y.%m.%d_%H:%M:%S` -X main.gitRevision=`git rev-parse --short HEAD` -X main.version=`git tag -l --points-at HEAD`"


update: mongo_build
docker build --tag walg:2.0 /home/sayed/go/src/kubedb.dev/wal-g/main/mongo
docker tag walg:2.0 sayedppqq/walg:2.0
docker push sayedppqq/walg:2.0
16 changes: 14 additions & 2 deletions cmd/mongo/oplog_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package mongo

import (
"context"
storageapi "kubestash.dev/apimachinery/apis/storage/v1alpha1"
"os"
"path"
"syscall"
"time"

Expand Down Expand Up @@ -73,7 +75,11 @@ func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplo
if err != nil {
return err
}
uplProvider.ChangeDirectory(models.OplogArchBasePath)
subDir := models.OplogArchBasePath
if pushArgs.dbProvider == string(storageapi.ProviderLocal) {
subDir = path.Join(pushArgs.dbPath, subDir)
}
uplProvider.ChangeDirectory(subDir)
uploader := archive.NewStorageUploader(uplProvider)
uploader.SetKubeClient(pushArgs.kubeClient)
uploader.SetSnapshot(snapshotName, snapshotNamespace)
Expand Down Expand Up @@ -146,6 +152,8 @@ type oplogPushRunArgs struct {
primaryWaitTimeout time.Duration
lwUpdate time.Duration
kubeClient controllerclient.Client
dbProvider string
dbPath string
}

func buildOplogPushRunArgs() (args oplogPushRunArgs, err error) {
Expand All @@ -164,6 +172,10 @@ func buildOplogPushRunArgs() (args oplogPushRunArgs, err error) {
return
}

args.dbProvider = internal.GetNonRequiredSetting(internal.MongoDBProvider)

args.dbPath = internal.GetNonRequiredSetting(internal.MongoDBPath)

args.primaryWait, err = internal.GetBoolSettingDefault(internal.OplogPushWaitForBecomePrimary, false)
if err != nil {
return
Expand Down Expand Up @@ -191,7 +203,7 @@ func buildOplogPushRunArgs() (args oplogPushRunArgs, err error) {
return
}

return
return args, err
}

type oplogPushStatsArgs struct {
Expand Down
41 changes: 20 additions & 21 deletions cmd/mongo/oplog_replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package mongo
import (
"context"
"encoding/json"
"os"
"syscall"

"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
Expand All @@ -16,6 +13,8 @@ import (
"github.com/wal-g/wal-g/internal/databases/mongo/oplog"
"github.com/wal-g/wal-g/internal/databases/mongo/stages"
"github.com/wal-g/wal-g/utility"
"os"
"syscall"
)

const LatestBackupString = "LATEST_BACKUP"
Expand Down Expand Up @@ -91,24 +90,24 @@ func buildOplogReplayRunArgs(cmdargs []string) (args oplogReplayRunArgs, err err
return args, nil
}

func processArg(arg string, downloader *archive.StorageDownloader) (models.Timestamp, error) {
switch arg {
case internal.LatestString:
return downloader.LastKnownArchiveTS()
case LatestBackupString:
lastBackupName, err := downloader.LastBackupName()
if err != nil {
return models.Timestamp{}, err
}
backupMeta, err := downloader.BackupMeta(lastBackupName)
if err != nil {
return models.Timestamp{}, err
}
return models.TimestampFromBson(backupMeta.MongoMeta.BackupLastTS), nil
default:
return models.TimestampFromStr(arg)
}
}
//func processArg(arg string, downloader *archive.StorageDownloader) (models.Timestamp, error) {
// switch arg {
// case internal.LatestString:
// return downloader.LastKnownArchiveTS()
// case LatestBackupString:
// lastBackupName, err := downloader.LastBackupName()
// if err != nil {
// return models.Timestamp{}, err
// }
// backupMeta, err := downloader.BackupMeta(lastBackupName)
// if err != nil {
// return models.Timestamp{}, err
// }
// return models.TimestampFromBson(backupMeta.MongoMeta.BackupLastTS), nil
// default:
// return models.TimestampFromStr(arg)
// }
//}

func runOplogReplay(ctx context.Context, replayArgs oplogReplayRunArgs) error {
tracelog.DebugLogger.Printf("starting replay with arguments: %+v", replayArgs)
Expand Down
2 changes: 2 additions & 0 deletions internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ const (
ProfileMode = "PROFILE_MODE"
ProfilePath = "PROFILE_PATH"

MongoDBProvider = "MONGODB_PROVIDER"
MongoDBPath = "MONGODB_PATH"
MongoDBUriSetting = "MONGODB_URI"
MongoDBLastWriteUpdateInterval = "MONGODB_LAST_WRITE_UPDATE_INTERVAL"
MongoDBRestoreDisableHostResetup = "MONGODB_RESTORE_DISABLE_HOST_RESETUP"
Expand Down
5 changes: 5 additions & 0 deletions internal/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,11 @@ func GetRequiredSetting(setting string) (string, error) {
return val, nil
}

func GetNonRequiredSetting(setting string) string {
val, _ := GetSetting(setting)
return val
}

func GetBoolSettingDefault(setting string, def bool) (bool, error) {
val, ok := GetSetting(setting)
if !ok {
Expand Down
4 changes: 2 additions & 2 deletions internal/databases/mysql/binlog_fetch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ func HandleBinlogFetch(folder storage.Folder, backupName string, untilTS string,
tracelog.ErrorLogger.FatalOnError(err)
var startTS, endTS, endBinlogTS time.Time
if skipStartTime {
startTS, endTS, endBinlogTS, err = getEndTimestamps(folder, untilTS, untilBinlogLastModifiedTS)
startTS, endTS, endBinlogTS, err = getEndTimestamps(untilTS, untilBinlogLastModifiedTS)
} else {
startTS, endTS, endBinlogTS, err = getTimestamps(folder, backupName, untilTS, untilBinlogLastModifiedTS)
tracelog.ErrorLogger.FatalOnError(err)
}
tracelog.ErrorLogger.FatalOnError(err)

handler := newIndexHandler(dstDir)

Expand Down
2 changes: 1 addition & 1 deletion internal/databases/mysql/binlog_replay_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func getTimestamps(folder storage.Folder, backupName, untilTS, untilBinlogLastMo
return startTS, endTS, endBinlogTS, nil
}

func getEndTimestamps(folder storage.Folder, untilTS, untilBinlogLastModifiedTS string) (time.Time, time.Time, time.Time, error) {
func getEndTimestamps(untilTS, untilBinlogLastModifiedTS string) (time.Time, time.Time, time.Time, error) {
endTS, err := utility.ParseUntilTS(untilTS)
if err != nil {
return time.Time{}, time.Time{}, time.Time{}, err
Expand Down

0 comments on commit ccebe88

Please sign in to comment.