From 1d2a43330210e4ffee97354e742f31f6b06b825d Mon Sep 17 00:00:00 2001 From: Jamie Date: Tue, 12 Dec 2023 17:00:12 +1300 Subject: [PATCH] test actual update but capped --- .../20231128_jellyfish_migration.go | 94 +++++++------------ 1 file changed, 34 insertions(+), 60 deletions(-) diff --git a/migrations/20231128_jellyfish_migration/20231128_jellyfish_migration.go b/migrations/20231128_jellyfish_migration/20231128_jellyfish_migration.go index d3aa7340c..c306b6212 100644 --- a/migrations/20231128_jellyfish_migration/20231128_jellyfish_migration.go +++ b/migrations/20231128_jellyfish_migration/20231128_jellyfish_migration.go @@ -34,10 +34,9 @@ type Config struct { } type Migration struct { - ctx context.Context - cli *cli.App - config *Config - //*migrationMongo.Migration + ctx context.Context + cli *cli.App + config *Config client *mongo.Client writeBatchSize *int64 updates []mongo.WriteModel @@ -50,8 +49,6 @@ func main() { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) defer cancel() - //application.RunAndExit(NewMigration(ctx)) - migration := NewMigration(ctx) migration.RunAndExit() log.Println("finished migration") @@ -59,9 +56,8 @@ func main() { func NewMigration(ctx context.Context) *Migration { return &Migration{ - ctx: ctx, - cli: cli.NewApp(), - //Migration: migrationMongo.NewMigration(), + ctx: ctx, + cli: cli.NewApp(), config: &Config{}, updates: []mongo.WriteModel{}, } @@ -90,7 +86,7 @@ func (m *Migration) RunAndExit() { log.Printf("execute failed: %s", err) return err } - log.Println("finished prepare") + log.Println("finished execute") return nil } @@ -103,11 +99,6 @@ func (m *Migration) RunAndExit() { } func (m *Migration) Initialize() error { - log.Println("init") - // if err := m.Migration.Initialize(provider); err != nil { - // return err - // } - m.CLI().Usage = "BACK-37: Migrate all existing data to add required Platform deduplication hash fields" m.CLI().Description = "BACK-37: To fully migrate devices from the `jellyfish` upload API to the `platform` upload API" m.CLI().Authors = []cli.Author{ @@ -117,12 +108,10 @@ func (m *Migration) Initialize() error { }, } m.CLI().Flags = append(m.CLI().Flags, - cli.BoolFlag{ Name: fmt.Sprintf("%s,%s", DryRunFlag, "n"), Usage: "dry run only; do not migrate", }, - cli.Int64Flag{ Name: "batch-size", Usage: "number of records to read each time", @@ -166,16 +155,6 @@ func (m *Migration) Initialize() error { Required: false, }, ) - - // m.CLI().Action = func(ctx *cli.Context) error { - // // if !m.ParseContext(ctx) { - // // return errors.New("could not parse context") - // // } - // if err := m.prepare(); err != nil { - // return nil - // } - // return m.execute() - // } return nil } @@ -203,6 +182,7 @@ func (m *Migration) prepare() error { func (m *Migration) execute() error { log.Println("about to run execute") totalMigrated := 0 + testingCapSize := 100 for m.fetchAndUpdateBatch() { updatedCount, err := m.writeBatchUpdates() if err != nil { @@ -211,36 +191,35 @@ func (m *Migration) execute() error { } totalMigrated = totalMigrated + updatedCount log.Printf("migrated %d for a total of %d migrated items", updatedCount, totalMigrated) + if totalMigrated >= testingCapSize { + log.Println("migrated docs up to cap so exiting") + break + } } return nil } func (m *Migration) getOplogDuration() (time.Duration, error) { + log.Println("checking oplog duration ...") type MongoMetaData struct { Wall time.Time `json:"wall"` } - - log.Println("checking oplog duration ") if oplogC := m.getOplogCollection(); oplogC != nil { - oldest := MongoMetaData{} + var oldest MongoMetaData if err := oplogC.FindOne( m.ctx, bson.M{"wall": bson.M{"$exists": true}}, options.FindOne().SetSort(bson.M{"$natural": 1})).Decode(&oldest); err != nil { - log.Printf("oldest walltime mongo err %v", err) return 0, err } - newest := MongoMetaData{} + var newest MongoMetaData if err := oplogC.FindOne( m.ctx, bson.M{"wall": bson.M{"$exists": true}}, options.FindOne().SetSort(bson.M{"$natural": -1})).Decode(&newest); err != nil { - log.Printf("newest walltime mongo err %v", err) return 0, err } - //oldestT := time.UnixMilli(oldest.Wall) - //newestT := time.UnixMilli(newest.Wall) oplogDuration := newest.Wall.Sub(oldest.Wall) log.Printf("oplog duration is currently: %v", oplogDuration) return oplogDuration, nil @@ -256,9 +235,8 @@ func calculateBatchSize(oplogSize int, oplogEntryBytes int, oplogMinWindow int, } func (m *Migration) setWriteBatchSize() error { - log.Println("set write batch size...") + log.Println("set writeBatchSize...") if oplogC := m.getOplogCollection(); oplogC != nil { - log.Println("Getting oplog stats...") type MongoMetaData struct { MaxSize int `json:"maxSize"` } @@ -285,19 +263,14 @@ func (m *Migration) checkFreeSpace() error { FsUsedSize int `json:"fsUsedSize"` } var metaData MongoMetaData - log.Println("Getting DB free space...") - if dataC := m.getDataCollection(); dataC != nil { - - err := dataC.Database().RunCommand(m.ctx, bson.M{"dbStats": 1}).Decode(&metaData) - if err != nil { + if err := dataC.Database().RunCommand(m.ctx, bson.M{"dbStats": 1}).Decode(&metaData); err != nil { return err } - log.Printf("DB free space: %v", metaData) + log.Printf("dbStats: %#v", metaData) bytesFree := metaData.FsTotalSize - metaData.FsUsedSize percentFree := int(math.Floor(float64(bytesFree) / float64(metaData.FsTotalSize) * 100)) log.Printf("DB disk currently has %d%% (%d bytes) free.", percentFree, bytesFree) - if m.config.minFreePercent > percentFree { return fmt.Errorf("error %d%% is below minimum free space of %d%%", percentFree, m.config.minFreePercent) } @@ -307,7 +280,7 @@ func (m *Migration) checkFreeSpace() error { } func (m *Migration) getWaitTime() (float64, error) { - log.Println("Loading DB replication status...") + log.Println("getting wait time ...") type Member struct { Name string `json:"name"` @@ -321,8 +294,9 @@ func (m *Migration) getWaitTime() (float64, error) { } var metaData MongoMetaData - m.client.Database("admin").RunCommand(m.ctx, bson.M{"replSetGetStatus": 1}).Decode(&metaData) - log.Printf("DB replication status loaded. %#v", metaData) + if err := m.client.Database("admin").RunCommand(m.ctx, bson.M{"replSetGetStatus": 1}).Decode(&metaData); err != nil { + return 0, err + } for _, member := range metaData.Members { if member.State < 1 || member.State > 2 || member.Health != 1 || member.Uptime < 120 { @@ -335,10 +309,9 @@ func (m *Migration) getWaitTime() (float64, error) { if err != nil { return 0, err } - log.Printf("oplogDuration %v ", oplogDuration) if oplogDuration.Seconds() < float64(m.config.minOplogWindow) { minOplogWindowTime := time.Duration(m.config.minOplogWindow) * time.Second - log.Printf("DB OPLOG shorter than requested duration of %s, currently %s.", minOplogWindowTime, oplogDuration) + log.Printf("DB oplog shorter than requested duration of %s, currently %s.", minOplogWindowTime, oplogDuration) waitTime := float64(m.config.minOplogWindow) - oplogDuration.Seconds() waitTime *= 1.15 if waitTime < 600 { @@ -350,7 +323,7 @@ func (m *Migration) getWaitTime() (float64, error) { } func (m *Migration) blockUntilDBReady() error { - log.Println("blocking...") + log.Println("blocking until ready...") waitTime, err := m.getWaitTime() if err != nil { return err @@ -373,7 +346,6 @@ func (m *Migration) blockUntilDBReady() error { } func (m *Migration) fetchAndUpdateBatch() bool { - selector := bson.M{ // jellyfish uses a generated _id that is not an mongo objectId "_id": bson.M{"$not": bson.M{"$type": "objectId"}}, @@ -382,7 +354,6 @@ func (m *Migration) fetchAndUpdateBatch() bool { "_userId": "5e8cac61-6bef-4728-b490-c1d82087ed9c", } m.updates = []mongo.WriteModel{} - if dataC := m.getDataCollection(); dataC != nil { dDataCursor, err := dataC.Find(m.ctx, selector, &options.FindOptions{Limit: &m.config.readBatchSize}, @@ -396,8 +367,7 @@ func (m *Migration) fetchAndUpdateBatch() bool { defer dDataCursor.Close(m.ctx) for dDataCursor.Next(m.ctx) { - err = dDataCursor.Decode(&dDataResult) - if err != nil { + if err = dDataCursor.Decode(&dDataResult); err != nil { log.Printf("failed decoding data: %s", err) return false } @@ -424,12 +394,12 @@ func (m *Migration) fetchAndUpdateBatch() bool { } return len(m.updates) > 0 } - log.Println("get deviceData collection ") return false } func (m *Migration) writeBatchUpdates() (int, error) { var getBatches = func(chunkSize int) [][]mongo.WriteModel { + log.Printf("updates to apply count: %d", len(m.updates)) batches := [][]mongo.WriteModel{} for i := 0; i < len(m.updates); i += chunkSize { end := i + chunkSize @@ -455,11 +425,15 @@ func (m *Migration) writeBatchUpdates() (int, error) { updateCount += len(batch) log.Printf("updates applied so far %d", updateCount) - // results, err := m.deviceDataC.BulkWrite(m.ctx, batch) - // if err != nil { - // return updateCount, err - // } - // updateCount = updateCount + int(results.ModifiedCount) + if deviceC := m.getDataCollection(); deviceC != nil { + results, err := deviceC.BulkWrite(m.ctx, batch) + if err != nil { + log.Printf("error writing batch updates %v", err) + return updateCount, err + } + log.Printf("update resuts %#v", results) + updateCount = updateCount + int(results.ModifiedCount) + } } log.Printf("applied %d updates", updateCount) return updateCount, nil