Skip to content

Commit

Permalink
testing updates
Browse files Browse the repository at this point in the history
  • Loading branch information
jh-bate committed Dec 6, 2023
1 parent 6c2668a commit d296bd0
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"fmt"
"math"
"time"
Expand Down Expand Up @@ -44,6 +45,7 @@ type Migration struct {
}

const oplogName = "oplog.rs"
const DryRunFlag = "dry-run"

func main() {
ctx := context.Background()
Expand Down Expand Up @@ -75,6 +77,12 @@ func (m *Migration) Initialize(provider application.Provider) error {
},
}
m.CLI().Flags = append(m.CLI().Flags,

cli.BoolFlag{
Name: fmt.Sprintf("%s,%s", DryRunFlag, "n"),
Usage: "dry run only; do not migrate",
},

cli.Int64Flag{
Name: "batch-size",
Usage: "number of records to read each time",
Expand Down Expand Up @@ -121,7 +129,7 @@ func (m *Migration) Initialize(provider application.Provider) error {

m.CLI().Action = func(ctx *cli.Context) error {
if !m.ParseContext(ctx) {
return nil
return errors.New("could not parse context")
}
if err := m.prepare(); err != nil {
return nil
Expand Down Expand Up @@ -196,6 +204,10 @@ func (m *Migration) getOplogDuration() (time.Duration, error) {

}

func calculateBatchSize(oplogSize int, oplogEntryBytes int, oplogMinWindow int, nopPercent int) int64 {
return int64(math.Floor(float64(oplogSize) / float64(oplogEntryBytes) / float64(oplogMinWindow) / (float64(nopPercent) / 7)))
}

func (m *Migration) setWriteBatchSize() error {
if m.oplogC != nil {
m.Logger().Debug("Getting oplog duration...")
Expand All @@ -206,7 +218,7 @@ func (m *Migration) setWriteBatchSize() error {
if err := m.oplogC.Database().RunCommand(m.ctx, bson.M{"collStats": oplogName}).Decode(&metaData); err != nil {
return err
}
writeBatchSize := int64(metaData.MaxSize / m.config.expectedOplogEntrySize / m.config.minOplogWindow / (m.config.nopPercent / 7))
writeBatchSize := calculateBatchSize(metaData.MaxSize, m.config.expectedOplogEntrySize, m.config.minOplogWindow, m.config.nopPercent)
m.writeBatchSize = &writeBatchSize
return nil
}
Expand Down
215 changes: 215 additions & 0 deletions migrations/20231128_jellyfish_migration/connect/connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package main

import (
"context"
"fmt"
"log"
"math"
"os"

"github.com/urfave/cli"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

type Config struct {
uri string
minOplogWindow int
// these values are used to determine writes batches, first dividing the oplog's size with the desired duration and
// expected entry size, then adding a divisor to account for NOP overshoot in the oplog
expectedOplogEntrySize int
// how much of the oplog is NOP, this adjusts the batch to account for an oplog that is very change sensitive
// must be > 0
// prod 0.6
// idle 100
nopPercent int
// minimum free disk space percent
minFreePercent int
readBatchSize int64
}

type Migration struct {
cli *cli.App
ctx context.Context
config *Config
client *mongo.Client
oplogC *mongo.Collection
deviceDataC *mongo.Collection
writeBatchSize *int64
updates []mongo.WriteModel
}

const oplogName = "oplog.rs"

func main() {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
migration := NewMigration(ctx)
migration.RunAndExit()
}

func NewMigration(ctx context.Context) *Migration {
return &Migration{
cli: cli.NewApp(),
ctx: ctx,
config: &Config{},
updates: []mongo.WriteModel{},
}
}

func (m *Migration) RunAndExit() {
if err := m.Initialize(); err != nil {
os.Exit(1)
}

m.CLI().Action = func(ctx *cli.Context) error {
log.Println("before prepare")
if err := m.prepare(); err != nil {
return err
}
return nil
}

if err := m.CLI().Run(os.Args); err != nil {
os.Exit(1)
}

os.Exit(0)
}

func (m *Migration) CLI() *cli.App {
return m.cli
}

func (m *Migration) Initialize() error {

log.Println("Initialize...")

m.CLI().Usage = "BACK-37: Migrate all existing data to add required Platform deduplication hash fields"
m.CLI().Description = "BACK-37: To fully migrate devices from the `jellyfish` upload API to the `platform` upload API"
m.CLI().Authors = []cli.Author{
{
Name: "J H BATE",
Email: "[email protected]",
},
}
m.CLI().Flags = append(m.CLI().Flags,
cli.Int64Flag{
Name: "batch-size",
Usage: "number of records to read each time",
Destination: &m.config.readBatchSize,
Value: 3000,
Required: false,
},
cli.IntFlag{
Name: "min-free-percent",
Usage: "minimum free disk space percent",
Destination: &m.config.minFreePercent,
Value: 10,
Required: false,
},
cli.IntFlag{
Name: "nop-percent",
Usage: "how much of the oplog is NOP",
Destination: &m.config.nopPercent,
Value: 100,
Required: false,
},
cli.IntFlag{
Name: "oplog-entry-size",
Usage: "minimum free disk space percent",
Destination: &m.config.expectedOplogEntrySize,
Value: 420,
Required: false,
},
cli.IntFlag{
Name: "oplog-window",
Usage: "minimum oplog window in seconds",
Destination: &m.config.minOplogWindow,
Value: 28800, // 8hrs
Required: false,
},
cli.StringFlag{
Name: "uri",
Usage: "mongo connection URI",
Destination: &m.config.uri,
Value: "mongodb://localhost:27017",
Required: false,
},
)
return nil
}

func (m *Migration) prepare() error {
log.Println("running prepare ...")
var err error
m.client, err = mongo.Connect(m.ctx, options.Client().ApplyURI(m.config.uri))
if err != nil {
return fmt.Errorf("unable to connect to MongoDB: %w", err)
}
defer m.client.Disconnect(m.ctx)

m.oplogC = m.client.Database("local").Collection(oplogName)
m.deviceDataC = m.client.Database("data").Collection("deviceData")

if err := m.checkFreeSpace(); err != nil {
return err
}

err = m.setWriteBatchSize()
if err != nil {
return err
}
return nil
}

func (m *Migration) setWriteBatchSize() error {
if m.oplogC != nil {
log.Println("Getting Write Batch Size...")
type MongoMetaData struct {
MaxSize int `json:"maxSize"`
}
var metaData MongoMetaData
if err := m.oplogC.Database().RunCommand(m.ctx, bson.M{"collStats": oplogName}).Decode(&metaData); err != nil {
return err
}
log.Printf("oplogSize... %v", metaData.MaxSize)
writeBatchSize := int64(math.Floor(
float64(metaData.MaxSize) /
float64(m.config.expectedOplogEntrySize) /
float64(m.config.minOplogWindow) /
(float64(m.config.nopPercent) / float64(7))))
log.Printf("writeBatchSize... %v", writeBatchSize)
m.writeBatchSize = &writeBatchSize
return nil
}
var writeBatchSize = int64(30000)
log.Printf("MongoDB is not clustered, removing write batch limit, setting to %d documents.", writeBatchSize)
m.writeBatchSize = &writeBatchSize
return nil
}

func (m *Migration) checkFreeSpace() error {
type MongoMetaData struct {
FsTotalSize int `json:"fsTotalSize"`
FsUsedSize int `json:"fsUsedSize"`
}
var metaData MongoMetaData
log.Println("Getting DB free space...")
err := m.deviceDataC.Database().RunCommand(m.ctx, bson.M{"dbStats": 1}).Decode(&metaData)
if err != nil {
return err
}

log.Printf("Stats ... %v ", metaData)
bytesFree := metaData.FsTotalSize - metaData.FsUsedSize
percentFree := int(math.Floor(float64(bytesFree) / float64(metaData.FsTotalSize) * 100))
log.Printf("DB disk currently has %d%% (%d) free.", percentFree*100, bytesFree)

if percentFree > m.config.minFreePercent {
return fmt.Errorf("error %d%% is below minimum free space of %d%%", percentFree, m.config.minFreePercent)
}
return nil
}

0 comments on commit d296bd0

Please sign in to comment.