Skip to content

Commit

Permalink
test actual update but capped
Browse files Browse the repository at this point in the history
  • Loading branch information
jh-bate committed Dec 12, 2023
1 parent fbeb0f8 commit 1d2a433
Showing 1 changed file with 34 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ type Config struct {
}

type Migration struct {
ctx context.Context
cli *cli.App
config *Config
//*migrationMongo.Migration
ctx context.Context
cli *cli.App
config *Config
client *mongo.Client
writeBatchSize *int64
updates []mongo.WriteModel
Expand All @@ -50,18 +49,15 @@ func main() {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
//application.RunAndExit(NewMigration(ctx))

migration := NewMigration(ctx)
migration.RunAndExit()
log.Println("finished migration")
}

func NewMigration(ctx context.Context) *Migration {
return &Migration{
ctx: ctx,
cli: cli.NewApp(),
//Migration: migrationMongo.NewMigration(),
ctx: ctx,
cli: cli.NewApp(),
config: &Config{},
updates: []mongo.WriteModel{},
}
Expand Down Expand Up @@ -90,7 +86,7 @@ func (m *Migration) RunAndExit() {
log.Printf("execute failed: %s", err)
return err
}
log.Println("finished prepare")
log.Println("finished execute")
return nil
}

Expand All @@ -103,11 +99,6 @@ func (m *Migration) RunAndExit() {
}

func (m *Migration) Initialize() error {
log.Println("init")
// if err := m.Migration.Initialize(provider); err != nil {
// return err
// }

m.CLI().Usage = "BACK-37: Migrate all existing data to add required Platform deduplication hash fields"
m.CLI().Description = "BACK-37: To fully migrate devices from the `jellyfish` upload API to the `platform` upload API"
m.CLI().Authors = []cli.Author{
Expand All @@ -117,12 +108,10 @@ func (m *Migration) Initialize() error {
},
}
m.CLI().Flags = append(m.CLI().Flags,

cli.BoolFlag{
Name: fmt.Sprintf("%s,%s", DryRunFlag, "n"),
Usage: "dry run only; do not migrate",
},

cli.Int64Flag{
Name: "batch-size",
Usage: "number of records to read each time",
Expand Down Expand Up @@ -166,16 +155,6 @@ func (m *Migration) Initialize() error {
Required: false,
},
)

// m.CLI().Action = func(ctx *cli.Context) error {
// // if !m.ParseContext(ctx) {
// // return errors.New("could not parse context")
// // }
// if err := m.prepare(); err != nil {
// return nil
// }
// return m.execute()
// }
return nil
}

Expand Down Expand Up @@ -203,6 +182,7 @@ func (m *Migration) prepare() error {
func (m *Migration) execute() error {
log.Println("about to run execute")
totalMigrated := 0
testingCapSize := 100
for m.fetchAndUpdateBatch() {
updatedCount, err := m.writeBatchUpdates()
if err != nil {
Expand All @@ -211,36 +191,35 @@ func (m *Migration) execute() error {
}
totalMigrated = totalMigrated + updatedCount
log.Printf("migrated %d for a total of %d migrated items", updatedCount, totalMigrated)
if totalMigrated >= testingCapSize {
log.Println("migrated docs up to cap so exiting")
break
}
}
return nil
}

func (m *Migration) getOplogDuration() (time.Duration, error) {
log.Println("checking oplog duration ...")
type MongoMetaData struct {
Wall time.Time `json:"wall"`
}

log.Println("checking oplog duration ")
if oplogC := m.getOplogCollection(); oplogC != nil {
oldest := MongoMetaData{}
var oldest MongoMetaData
if err := oplogC.FindOne(
m.ctx,
bson.M{"wall": bson.M{"$exists": true}},
options.FindOne().SetSort(bson.M{"$natural": 1})).Decode(&oldest); err != nil {
log.Printf("oldest walltime mongo err %v", err)
return 0, err
}

newest := MongoMetaData{}
var newest MongoMetaData
if err := oplogC.FindOne(
m.ctx,
bson.M{"wall": bson.M{"$exists": true}},
options.FindOne().SetSort(bson.M{"$natural": -1})).Decode(&newest); err != nil {
log.Printf("newest walltime mongo err %v", err)
return 0, err
}
//oldestT := time.UnixMilli(oldest.Wall)
//newestT := time.UnixMilli(newest.Wall)
oplogDuration := newest.Wall.Sub(oldest.Wall)
log.Printf("oplog duration is currently: %v", oplogDuration)
return oplogDuration, nil
Expand All @@ -256,9 +235,8 @@ func calculateBatchSize(oplogSize int, oplogEntryBytes int, oplogMinWindow int,
}

func (m *Migration) setWriteBatchSize() error {
log.Println("set write batch size...")
log.Println("set writeBatchSize...")
if oplogC := m.getOplogCollection(); oplogC != nil {
log.Println("Getting oplog stats...")
type MongoMetaData struct {
MaxSize int `json:"maxSize"`
}
Expand All @@ -285,19 +263,14 @@ func (m *Migration) checkFreeSpace() error {
FsUsedSize int `json:"fsUsedSize"`
}
var metaData MongoMetaData
log.Println("Getting DB free space...")

if dataC := m.getDataCollection(); dataC != nil {

err := dataC.Database().RunCommand(m.ctx, bson.M{"dbStats": 1}).Decode(&metaData)
if err != nil {
if err := dataC.Database().RunCommand(m.ctx, bson.M{"dbStats": 1}).Decode(&metaData); err != nil {
return err
}
log.Printf("DB free space: %v", metaData)
log.Printf("dbStats: %#v", metaData)
bytesFree := metaData.FsTotalSize - metaData.FsUsedSize
percentFree := int(math.Floor(float64(bytesFree) / float64(metaData.FsTotalSize) * 100))
log.Printf("DB disk currently has %d%% (%d bytes) free.", percentFree, bytesFree)

if m.config.minFreePercent > percentFree {
return fmt.Errorf("error %d%% is below minimum free space of %d%%", percentFree, m.config.minFreePercent)
}
Expand All @@ -307,7 +280,7 @@ func (m *Migration) checkFreeSpace() error {
}

func (m *Migration) getWaitTime() (float64, error) {
log.Println("Loading DB replication status...")
log.Println("getting wait time ...")

type Member struct {
Name string `json:"name"`
Expand All @@ -321,8 +294,9 @@ func (m *Migration) getWaitTime() (float64, error) {
}

var metaData MongoMetaData
m.client.Database("admin").RunCommand(m.ctx, bson.M{"replSetGetStatus": 1}).Decode(&metaData)
log.Printf("DB replication status loaded. %#v", metaData)
if err := m.client.Database("admin").RunCommand(m.ctx, bson.M{"replSetGetStatus": 1}).Decode(&metaData); err != nil {
return 0, err
}

for _, member := range metaData.Members {
if member.State < 1 || member.State > 2 || member.Health != 1 || member.Uptime < 120 {
Expand All @@ -335,10 +309,9 @@ func (m *Migration) getWaitTime() (float64, error) {
if err != nil {
return 0, err
}
log.Printf("oplogDuration %v ", oplogDuration)
if oplogDuration.Seconds() < float64(m.config.minOplogWindow) {
minOplogWindowTime := time.Duration(m.config.minOplogWindow) * time.Second
log.Printf("DB OPLOG shorter than requested duration of %s, currently %s.", minOplogWindowTime, oplogDuration)
log.Printf("DB oplog shorter than requested duration of %s, currently %s.", minOplogWindowTime, oplogDuration)
waitTime := float64(m.config.minOplogWindow) - oplogDuration.Seconds()
waitTime *= 1.15
if waitTime < 600 {
Expand All @@ -350,7 +323,7 @@ func (m *Migration) getWaitTime() (float64, error) {
}

func (m *Migration) blockUntilDBReady() error {
log.Println("blocking...")
log.Println("blocking until ready...")
waitTime, err := m.getWaitTime()
if err != nil {
return err
Expand All @@ -373,7 +346,6 @@ func (m *Migration) blockUntilDBReady() error {
}

func (m *Migration) fetchAndUpdateBatch() bool {

selector := bson.M{
// jellyfish uses a generated _id that is not an mongo objectId
"_id": bson.M{"$not": bson.M{"$type": "objectId"}},
Expand All @@ -382,7 +354,6 @@ func (m *Migration) fetchAndUpdateBatch() bool {
"_userId": "5e8cac61-6bef-4728-b490-c1d82087ed9c",
}
m.updates = []mongo.WriteModel{}

if dataC := m.getDataCollection(); dataC != nil {
dDataCursor, err := dataC.Find(m.ctx, selector,
&options.FindOptions{Limit: &m.config.readBatchSize},
Expand All @@ -396,8 +367,7 @@ func (m *Migration) fetchAndUpdateBatch() bool {

defer dDataCursor.Close(m.ctx)
for dDataCursor.Next(m.ctx) {
err = dDataCursor.Decode(&dDataResult)
if err != nil {
if err = dDataCursor.Decode(&dDataResult); err != nil {
log.Printf("failed decoding data: %s", err)
return false
}
Expand All @@ -424,12 +394,12 @@ func (m *Migration) fetchAndUpdateBatch() bool {
}
return len(m.updates) > 0
}
log.Println("get deviceData collection ")
return false
}

func (m *Migration) writeBatchUpdates() (int, error) {
var getBatches = func(chunkSize int) [][]mongo.WriteModel {
log.Printf("updates to apply count: %d", len(m.updates))
batches := [][]mongo.WriteModel{}
for i := 0; i < len(m.updates); i += chunkSize {
end := i + chunkSize
Expand All @@ -455,11 +425,15 @@ func (m *Migration) writeBatchUpdates() (int, error) {

updateCount += len(batch)
log.Printf("updates applied so far %d", updateCount)
// results, err := m.deviceDataC.BulkWrite(m.ctx, batch)
// if err != nil {
// return updateCount, err
// }
// updateCount = updateCount + int(results.ModifiedCount)
if deviceC := m.getDataCollection(); deviceC != nil {
results, err := deviceC.BulkWrite(m.ctx, batch)
if err != nil {
log.Printf("error writing batch updates %v", err)
return updateCount, err
}
log.Printf("update resuts %#v", results)
updateCount = updateCount + int(results.ModifiedCount)
}
}
log.Printf("applied %d updates", updateCount)
return updateCount, nil
Expand Down

0 comments on commit 1d2a433

Please sign in to comment.