Skip to content

Commit

Permalink
Add: shard support for MongoDB
Browse files Browse the repository at this point in the history
Signed-off-by: sayedppqq <[email protected]>
  • Loading branch information
sayedppqq committed Apr 25, 2024
1 parent 77f7779 commit 98f05ce
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 94 deletions.
2 changes: 1 addition & 1 deletion cmd/mongo/oplog_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func buildOplogPushRunArgs() (args oplogPushRunArgs, err error) {
return
}

return
return args, nil
}

type oplogPushStatsArgs struct {
Expand Down
40 changes: 20 additions & 20 deletions cmd/mongo/oplog_replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,24 +96,24 @@ func buildOplogReplayRunArgs(cmdargs []string) (args oplogReplayRunArgs, err err
return args, nil
}

func processArg(arg string, downloader *archive.StorageDownloader) (models.Timestamp, error) {
switch arg {
case internal.LatestString:
return downloader.LastKnownArchiveTS()
case LatestBackupString:
lastBackupName, err := downloader.LastBackupName()
if err != nil {
return models.Timestamp{}, err
}
backupMeta, err := downloader.BackupMeta(lastBackupName)
if err != nil {
return models.Timestamp{}, err
}
return models.TimestampFromBson(backupMeta.MongoMeta.BackupLastTS), nil
default:
return models.TimestampFromStr(arg)
}
}
//func processArg(arg string, downloader *archive.StorageDownloader) (models.Timestamp, error) {
// switch arg {
// case internal.LatestString:
// return downloader.LastKnownArchiveTS()
// case LatestBackupString:
// lastBackupName, err := downloader.LastBackupName()
// if err != nil {
// return models.Timestamp{}, err
// }
// backupMeta, err := downloader.BackupMeta(lastBackupName)
// if err != nil {
// return models.Timestamp{}, err
// }
// return models.TimestampFromBson(backupMeta.MongoMeta.BackupLastTS), nil
// default:
// return models.TimestampFromStr(arg)
// }
//}

func runOplogReplay(ctx context.Context, replayArgs oplogReplayRunArgs) error {
tracelog.DebugLogger.Printf("starting replay with arguments: %+v", replayArgs)
Expand All @@ -138,8 +138,8 @@ func runOplogReplay(ctx context.Context, replayArgs oplogReplayRunArgs) error {
return err
}

filterList := shake.OplogFilterChain{new(shake.AutologousFilter), new(shake.NoopFilter), new(shake.DDLFilter)}
dbApplier := oplog.NewDBApplier(mongoClient, false, replayArgs.ignoreErrCodes, replayArgs.dbNode, filterList)
filterList := shake.OplogFilterChain{new(shake.AutologousFilter), new(shake.NoopFilter)}
dbApplier := oplog.NewDBApplier(mongoClient, true, replayArgs.ignoreErrCodes, replayArgs.dbNode, filterList)
oplogApplier := stages.NewGenericApplier(dbApplier)

// set up storage downloader client
Expand Down
16 changes: 1 addition & 15 deletions internal/databases/mongo/archive/layout.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func SequenceBetweenTS(archives []models.Archive, since, until models.Timestamp)

func GetUpdatedBackupTimes(archives []models.Archive, since, until models.Timestamp) (models.Timestamp, models.Timestamp) {
var updatedSince models.Timestamp
//var minimum []models.Timestamp
for i := range archives {
arch := archives[i]
if arch.Type != models.ArchiveTypeOplog {
Expand All @@ -85,20 +84,7 @@ func GetUpdatedBackupTimes(archives []models.Archive, since, until models.Timest
if arch.In(since) {
updatedSince = arch.Start
}
//minimum = append(minimum, arch.Start)
//fmt.Printf("ar---- %v %v\n", arch.Start, arch.End)
}
//sort.Slice(minimum, func(i, j int) bool {
// if minimum[i].TS == minimum[j].TS {
// return minimum[i].Inc < minimum[j].Inc
// }
// return minimum[i].TS < minimum[j].TS
//})
//fmt.Println("------", minimum[0], updatedSince)
//if structs.IsZero(updatedSince) {
// updatedSince = minimum[0]
//}

}
return updatedSince, until
}

Expand Down
6 changes: 4 additions & 2 deletions internal/databases/mongo/archive/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func (sd *StorageDownloader) LastBackupName() (string, error) {

// DownloadOplogArchive downloads, decompresses and decrypts (if needed) oplog archive.
func (sd *StorageDownloader) DownloadOplogArchive(arch models.Archive, writeCloser io.WriteCloser) error {
return internal.DownloadFile(internal.NewFolderReader(sd.oplogsFolder), arch.DBNodeSpecificFileName(sd.GetNodeSpecificDownloader()), arch.Extension(), writeCloser)
return internal.DownloadFile(internal.NewFolderReader(sd.oplogsFolder),
arch.DBNodeSpecificFileName(sd.GetNodeSpecificDownloader()), arch.Extension(), writeCloser)
}

// ListOplogArchives fetches all oplog archives existed in storage.
Expand Down Expand Up @@ -323,7 +324,8 @@ func (su *StorageUploader) UploadGapArchive(archErr error, firstTS, lastTS model
return fmt.Errorf("can not build archive: %w", err)
}

if err := su.PushStreamToDestination(context.Background(), strings.NewReader(archErr.Error()), arch.DBNodeSpecificFileName(su.dbNode)); err != nil {
if err := su.PushStreamToDestination(context.Background(), strings.NewReader(archErr.Error()),
arch.DBNodeSpecificFileName(su.dbNode)); err != nil {
return fmt.Errorf("error while uploading stream: %w", err)
}
return nil
Expand Down
4 changes: 0 additions & 4 deletions internal/databases/mongo/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,6 @@ func (mc *MongoClient) ApplyOp(ctx context.Context, dbop db.Oplog) error {
cmd[0] = bson.E{Key: "applyOps", Value: []interface{}{op}}
apply := mc.c.Database("admin").RunCommand(ctx, cmd)
if err := apply.Err(); err != nil {
fmt.Println("-------------------------------------", err)
if mongo.IsDuplicateKeyError(err) {
return nil
}
return err
}
resp := CmdResponse{}
Expand Down
35 changes: 10 additions & 25 deletions internal/databases/mongo/oplog/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"io"
"strings"
)

type TypeAssertionError struct {
Expand Down Expand Up @@ -83,8 +82,10 @@ type DBApplier struct {
}

// NewDBApplier builds DBApplier with given args.
func NewDBApplier(m client.MongoDriver, preserveUUID bool, ignoreErrCodes map[string][]int32, node string, filterList shake.OplogFilterChain) *DBApplier {
return &DBApplier{db: m, txnBuffer: txn.NewBuffer(), preserveUUID: preserveUUID, applyIgnoreErrorCodes: ignoreErrCodes, dbNode: node, filterList: filterList}
func NewDBApplier(m client.MongoDriver, preserveUUID bool, ignoreErrCodes map[string][]int32,
node string, filterList shake.OplogFilterChain) *DBApplier {
return &DBApplier{db: m, txnBuffer: txn.NewBuffer(), preserveUUID: preserveUUID,
applyIgnoreErrorCodes: ignoreErrCodes, dbNode: node, filterList: filterList}
}

func (ap *DBApplier) Apply(ctx context.Context, opr models.Oplog) error {
Expand All @@ -99,17 +100,12 @@ func (ap *DBApplier) Apply(ctx context.Context, opr models.Oplog) error {
return nil
}

if strings.HasPrefix(ap.dbNode, "shard") {
if ap.dbNode != "configsvr" {
if ap.filterList.IterateFilter(&op) {
return nil
}
}

//if err := ap.shouldSkip(op.Operation, op.Namespace); err != nil {
// tracelog.DebugLogger.Printf("skipping op %+v due to: %+v", op, err)
// return nil
//}

meta, err := txn.NewMeta(op)
if err != nil {
return fmt.Errorf("can not extract op metadata: %w", err)
Expand Down Expand Up @@ -142,26 +138,15 @@ func (ap *DBApplier) Close(ctx context.Context) error {
return nil
}

func (ap *DBApplier) shouldSkip(op, ns string) error {
if op == "n" {
return fmt.Errorf("noop op")
}

// sharded clusters are not supported yet
if (strings.HasPrefix(ns, "config.") || strings.HasPrefix(ns, "admin.")) && ap.dbNode != "configsvr" {
return fmt.Errorf("config database op")
}

return nil
}

// shouldIgnore checks if error should be ignored
func (ap *DBApplier) shouldIgnore(op string, err error) bool {
ce, ok := err.(mongo.CommandError)
if !ok {
return false
}

if mongo.IsDuplicateKeyError(err) {
return true
}
ignoreErrorCodes, ok := ap.applyIgnoreErrorCodes[op]
if !ok {
return false
Expand Down Expand Up @@ -273,8 +258,8 @@ func indexSpecFromCommitIndexBuilds(op db.Oplog) (string, []client.IndexDocument
if !ok {
return "", nil, NewTypeAssertionError("bson.D", fmt.Sprintf("indexes[%d]", i), elemE.Value)
}
for i := range elements {
elemE = elements[i]
for j := range elements {
elemE = elements[j]
if elemE.Key == "key" {
if indexSpecs[i].Key, ok = elemE.Value.(bson.D); !ok {
return "", nil, NewTypeAssertionError("bson.D", "key", elemE.Value)
Expand Down
42 changes: 19 additions & 23 deletions internal/databases/mongo/shake/filter.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package shake

import (
"fmt"
"github.com/mongodb/mongo-tools-common/db"
"github.com/wal-g/tracelog"
"reflect"
"strings"
)

// OplogFilter: AutologousFilter, NoopFilter, DDLFilter
// OplogFilter: AutologousFilter, NoopFilter
type OplogFilter interface {
Filter(log *db.Oplog) bool
}
Expand All @@ -28,21 +27,16 @@ type AutologousFilter struct {
}

func (filter *AutologousFilter) Filter(log *db.Oplog) bool {

// Filter out unnecessary commands
if operation, found := ExtraCommandName(log.Object); found {
fmt.Printf("unnecessary commands. operation: %v found: %v\n", operation)
if IsNeedFilterCommand(operation) {
return true
}
}

// for namespace. we filter noop operation and collection name
// that are admin, local, mongoshake, mongoshake_conflict
return filter.FilterNs(log.Namespace)
}

// namespace should be filtered.
// NsShouldBeIgnore for namespaces should be filtered.
// key: ns, value: true means prefix, false means contain
var NsShouldBeIgnore = map[string]bool{
"admin.": true,
Expand All @@ -51,32 +45,33 @@ var NsShouldBeIgnore = map[string]bool{
"system.views": false,
}

// namespace should not be filtered.
// NsShouldNotBeIgnore has a higher priority than NsShouldBeIgnore
// key: ns, value: true means prefix, false means contain
var NsShouldNotBeIgnore = map[string]bool{
"admin.$cmd": true,
"admin.$cmd": true,
"admin.system.users": false,
"admin.system.roles": false,
}

func (filter *AutologousFilter) FilterNs(namespace string) bool {
// for namespace. we filter noop operation and collection name
// that are admin, local, config, mongoshake, mongoshake_conflict
// that are admin, local, config

// v2.4.13, don't filter admin.$cmd which may include transaction
// we don't filter admin.system.users and admin.system.roles to retrieve roles and users
for key, val := range NsShouldNotBeIgnore {
if val == true && strings.HasPrefix(namespace, key) {
if val && strings.HasPrefix(namespace, key) {
return false
}
if val == false && strings.Contains(namespace, key) {
if !val && strings.Contains(namespace, key) {
return false
}
}

for key, val := range NsShouldBeIgnore {
if val == true && strings.HasPrefix(namespace, key) {
if val && strings.HasPrefix(namespace, key) {
return true
}
if val == false && strings.Contains(namespace, key) {
if !val && strings.Contains(namespace, key) {
return true
}
}
Expand All @@ -90,10 +85,11 @@ func (filter *NoopFilter) Filter(log *db.Oplog) bool {
return log.Operation == "n"
}

type DDLFilter struct {
}

func (filter *DDLFilter) Filter(log *db.Oplog) bool {
operation, _ := ExtraCommandName(log.Object)
return log.Operation == "c" && operation != "applyOps" && operation != "create" || strings.HasSuffix(log.Namespace, "system.indexes")
}
//type DDLFilter struct {
//}
//
//func (filter *DDLFilter) Filter(log *db.Oplog) bool {
// //operation, _ := ExtraCommandName(log.Object)
// //return log.Operation == "c" && operation != "applyOps" && operation != "create" || strings.HasSuffix(log.Namespace, "system.indexes")
// return false
//}
1 change: 0 additions & 1 deletion internal/databases/mongo/stages/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ func NewGenericApplier(applier oplog.Applier) *GenericApplier {

// Apply runs working cycle that applies oplog records.
func (dba *GenericApplier) Apply(ctx context.Context, ch chan *models.Oplog) (chan error, error) {

errc := make(chan error)
go func() {
defer close(errc)
Expand Down
4 changes: 2 additions & 2 deletions internal/databases/mysql/binlog_fetch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ func HandleBinlogFetch(folder storage.Folder, backupName string, untilTS string,
tracelog.ErrorLogger.FatalOnError(err)
var startTS, endTS, endBinlogTS time.Time
if skipStartTime {
startTS, endTS, endBinlogTS, err = getEndTimestamps(folder, untilTS, untilBinlogLastModifiedTS)
startTS, endTS, endBinlogTS, err = getEndTimestamps(untilTS, untilBinlogLastModifiedTS)
} else {
startTS, endTS, endBinlogTS, err = getTimestamps(folder, backupName, untilTS, untilBinlogLastModifiedTS)
tracelog.ErrorLogger.FatalOnError(err)
}
tracelog.ErrorLogger.FatalOnError(err)

handler := newIndexHandler(dstDir)

Expand Down
2 changes: 1 addition & 1 deletion internal/databases/mysql/binlog_replay_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func getTimestamps(folder storage.Folder, backupName, untilTS, untilBinlogLastMo
return startTS, endTS, endBinlogTS, nil
}

func getEndTimestamps(folder storage.Folder, untilTS, untilBinlogLastModifiedTS string) (time.Time, time.Time, time.Time, error) {
func getEndTimestamps(untilTS, untilBinlogLastModifiedTS string) (time.Time, time.Time, time.Time, error) {
endTS, err := utility.ParseUntilTS(untilTS)
if err != nil {
return time.Time{}, time.Time{}, time.Time{}, err
Expand Down

0 comments on commit 98f05ce

Please sign in to comment.