diff --git a/migrations/20231128_jellyfish_migration/20231128_jellyfish_migration.go b/migrations/20231128_jellyfish_migration/20231128_jellyfish_migration.go new file mode 100644 index 000000000..cec3773e5 --- /dev/null +++ b/migrations/20231128_jellyfish_migration/20231128_jellyfish_migration.go @@ -0,0 +1,381 @@ +package main + +import ( + "context" + "fmt" + "math" + "time" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + + "github.com/tidepool-org/platform/application" + migrationMongo "github.com/tidepool-org/platform/migration/mongo" + "github.com/tidepool-org/platform/migrations/20231128_jellyfish_migration/utils" + "github.com/urfave/cli" +) + +type Config struct { + uri string + minOplogWindow int + // these values are used to determine writes batches, first dividing the oplog's size with the desired duration and + // expected entry size, then adding a divisor to account for NOP overshoot in the oplog + expectedOplogEntrySize int + // how much of the oplog is NOP, this adjusts the batch to account for an oplog that is very change sensitive + // must be > 0 + // prod 0.6 + // idle 100 + nopPercent int + // minimum free disk space percent + minFreePercent int + readBatchSize int64 +} + +type Migration struct { + ctx context.Context + config *Config + *migrationMongo.Migration + client *mongo.Client + oplogC *mongo.Collection + deviceDataC *mongo.Collection + writeBatchSize *int64 + updates []mongo.WriteModel +} + +const oplogName = "oplog.rs" + +func main() { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + application.RunAndExit(NewMigration(ctx)) +} + +func NewMigration(ctx context.Context) *Migration { + return &Migration{ + ctx: ctx, + Migration: migrationMongo.NewMigration(), + config: &Config{}, + updates: []mongo.WriteModel{}, + } +} + +func (m *Migration) Initialize(provider application.Provider) error { + if err := m.Migration.Initialize(provider); err != nil { + return err + } + + m.CLI().Usage = "BACK-37: Migrate all existing data to add required Platform deduplication hash fields" + m.CLI().Description = "BACK-37: To fully migrate devices from the `jellyfish` upload API to the `platform` upload API" + m.CLI().Authors = []cli.Author{ + { + Name: "J H BATE", + Email: "jamie@tidepool.org", + }, + } + m.CLI().Flags = append(m.CLI().Flags, + cli.Int64Flag{ + Name: "batch-size", + Usage: "number of records to read each time", + Destination: &m.config.readBatchSize, + Value: 3000, + Required: false, + }, + cli.IntFlag{ + Name: "min-free-percent", + Usage: "minimum free disk space percent", + Destination: &m.config.minFreePercent, + Value: 10, + Required: false, + }, + cli.IntFlag{ + Name: "nop-percent", + Usage: "how much of the oplog is NOP", + Destination: &m.config.nopPercent, + Value: 100, + Required: false, + }, + cli.IntFlag{ + Name: "oplog-entry-size", + Usage: "minimum free disk space percent", + Destination: &m.config.expectedOplogEntrySize, + Value: 420, + Required: false, + }, + cli.IntFlag{ + Name: "oplog-window", + Usage: "minimum oplog window in seconds", + Destination: &m.config.minOplogWindow, + Value: 28800, // 8hrs + Required: false, + }, + cli.StringFlag{ + Name: "uri", + Usage: "mongo connection URI", + Destination: &m.config.uri, + Value: "mongodb://localhost:27017", + Required: false, + }, + ) + + m.CLI().Action = func(ctx *cli.Context) error { + if !m.ParseContext(ctx) { + return nil + } + if err := m.prepare(); err != nil { + return nil + } + return m.execute() + } + return nil +} + +func (m *Migration) prepare() error { + var err error + m.client, err = mongo.Connect(m.ctx, options.Client().ApplyURI(m.config.uri)) + if err != nil { + return fmt.Errorf("unable to connect to MongoDB: %w", err) + } + defer m.client.Disconnect(m.ctx) + + m.oplogC = m.client.Database("local").Collection(oplogName) + m.deviceDataC = m.client.Database("data").Collection("deviceData") + + if err := m.checkFreeSpace(); err != nil { + return err + } + + err = m.setWriteBatchSize() + if err != nil { + return err + } + return nil +} + +func (m *Migration) execute() error { + totalMigrated := 0 + for m.fetchAndUpdateBatch() { + updatedCount, err := m.writeBatchUpdates() + if err != nil { + m.Logger().WithError(err).Error("failed writing batch") + return err + } + totalMigrated = totalMigrated + updatedCount + m.Logger().Debugf("migrated %d for a total of %d migrated items", updatedCount, totalMigrated) + } + return nil +} + +func (m *Migration) getOplogDuration() (time.Duration, error) { + type MongoMetaData struct { + Wall time.Time `json:"wall"` + } + if m.oplogC != nil { + var oldest MongoMetaData + if err := m.oplogC.FindOne( + m.ctx, bson.M{}, + options.FindOne().SetSort("$natural"), + options.FindOne().SetProjection(bson.M{"wall": 1})).Decode(&oldest); err != nil { + return 0, err + } + var newest MongoMetaData + if err := m.oplogC.FindOne(m.ctx, + bson.M{}, + options.FindOne().SetSort("-$natural"), + options.FindOne().SetProjection(bson.M{"wall": 1})).Decode(&newest); err != nil { + return 0, err + } + oplogDuration := oldest.Wall.Sub(oldest.Wall) + m.Logger().Debugf("oplog duration is currently: %v\n", oplogDuration) + return oplogDuration, nil + } + m.Logger().Debug("Not clustered, not retrieving oplog duration.") + oplogDuration := time.Duration(m.config.minOplogWindow+1) * time.Second + return oplogDuration, nil + +} + +func (m *Migration) setWriteBatchSize() error { + if m.oplogC != nil { + m.Logger().Debug("Getting oplog duration...") + type MongoMetaData struct { + MaxSize int `json:"maxSize"` + } + var metaData MongoMetaData + if err := m.oplogC.Database().RunCommand(m.ctx, bson.M{"collStats": oplogName}).Decode(&metaData); err != nil { + return err + } + writeBatchSize := int64(metaData.MaxSize / m.config.expectedOplogEntrySize / m.config.minOplogWindow / (m.config.nopPercent / 7)) + m.writeBatchSize = &writeBatchSize + return nil + } + var writeBatchSize = int64(30000) + m.Logger().Debugf("MongoDB is not clustered, removing write batch limit, setting to %d documents.", writeBatchSize) + m.writeBatchSize = &writeBatchSize + return nil +} + +func (m *Migration) checkFreeSpace() error { + type MongoMetaData struct { + FsTotalSize int `json:"fsTotalSize"` + FsUsedSize int `json:"fsUsedSize"` + } + var metaData MongoMetaData + m.Logger().Debug("Getting DB free space...") + err := m.deviceDataC.Database().RunCommand(m.ctx, bson.M{"dbStats": 1}).Decode(&metaData) + if err != nil { + return err + } + bytesFree := metaData.FsTotalSize - metaData.FsUsedSize + percentFree := int(math.Floor(float64(bytesFree) / float64(metaData.FsTotalSize) * 100)) + m.Logger().Debugf("DB disk currently has %d%% (%d) free.", percentFree*100, bytesFree) + + if percentFree > m.config.minFreePercent { + return fmt.Errorf("error %d%% is below minimum free space of %d%%", percentFree, m.config.minFreePercent) + } + return nil +} + +func (m *Migration) getWaitTime() (float64, error) { + m.Logger().Debug("Loading DB replication status...") + + type Member struct { + Name string `json:"name"` + Health int `json:"health"` + Uptime int `json:"uptime"` + State int `json:"state"` + } + + type MongoMetaData struct { + Members []Member `json:"members"` + } + + var metaData MongoMetaData + m.client.Database("admin").RunCommand(m.ctx, bson.M{"replSetGetStatus": 1}).Decode(&metaData) + m.Logger().Debug("DB replication status loaded.") + + for _, member := range metaData.Members { + if member.State < 1 || member.State > 2 || member.Health != 1 || member.Uptime < 120 { + m.Logger().Debugf("DB member %s down or not ready.", member.Name) + return 240, nil + } + } + + oplogDuration, err := m.getOplogDuration() + if err != nil { + return 0, err + } + if oplogDuration.Seconds() < float64(m.config.minOplogWindow) { + minOplogWindowTime := time.Duration(m.config.minOplogWindow) * time.Second + m.Logger().Debugf("DB OPLOG shorter than requested duration of %s, currently %s.", minOplogWindowTime, oplogDuration) + waitTime := float64(m.config.minOplogWindow) - oplogDuration.Seconds() + waitTime *= 1.15 + if waitTime < 600 { + waitTime = 600 + } + return waitTime, nil + } + return 0, nil +} + +func (m *Migration) blockUntilDBReady() error { + waitTime, err := m.getWaitTime() + if err != nil { + return err + } + var totalWait float64 + for waitTime > 0 { + totalWait += waitTime + if totalWait > 1800 { + m.Logger().Debugf("Long total wait of %s, possibly high load, or sustained DB outage. If neither, adjust NOP_PERCENT to reduce overshoot.", time.Duration(totalWait)*time.Second) + } + m.Logger().Debugf("Sleeping for %d", time.Duration(waitTime)*time.Second) + time.Sleep(time.Duration(waitTime) * time.Second) + waitTime, err = m.getWaitTime() + if err != nil { + return err + } + } + return nil +} + +func (m *Migration) fetchAndUpdateBatch() bool { + selector := bson.M{ + // jellyfish uses a generated _id that is not an mongo objectId + "_id": bson.M{"$not": bson.M{"$type": "objectId"}}, + "_deduplicator": bson.M{"$exists": false}, + } + m.updates = []mongo.WriteModel{} + + dDataCursor, err := m.deviceDataC.Find(m.ctx, selector, + &options.FindOptions{Limit: &m.config.readBatchSize}, + ) + if err != nil { + m.Logger().WithError(err).Error("failed to select data") + return false + } + + var dDataResult bson.M + + defer dDataCursor.Close(m.ctx) + for dDataCursor.Next(m.ctx) { + err = dDataCursor.Decode(&dDataResult) + if err != nil { + m.Logger().WithError(err).Error("failed decoding data") + return false + } + + datumID, err := utils.GetValidatedString(dDataResult, "_id") + if err != nil { + m.Logger().WithError(err).Error("failed getting dutum _id") + return false + } + + updates, err := utils.GetDatumUpdates(dDataResult) + if err != nil { + m.Logger().WithError(err).Error("failed getting datum updates") + return false + } + + m.updates = append(m.updates, mongo.NewUpdateOneModel().SetFilter( + bson.M{ + "_id": datumID, + "modifiedTime": dDataResult["modifiedTime"], + }).SetUpdate(bson.M{ + "$set": updates, + })) + } + return len(m.updates) > 0 +} + +func (m *Migration) writeBatchUpdates() (int, error) { + var getBatches = func(chunkSize int) [][]mongo.WriteModel { + batches := [][]mongo.WriteModel{} + for i := 0; i < len(m.updates); i += chunkSize { + end := i + chunkSize + + if end > len(m.updates) { + end = len(m.updates) + } + batches = append(batches, m.updates[i:end]) + } + return batches + } + + updateCount := 0 + for _, batch := range getBatches(int(*m.writeBatchSize)) { + if err := m.blockUntilDBReady(); err != nil { + return updateCount, err + } + if err := m.checkFreeSpace(); err != nil { + return updateCount, err + } + results, err := m.deviceDataC.BulkWrite(m.ctx, batch) + if err != nil { + return updateCount, err + } + updateCount = updateCount + int(results.ModifiedCount) + } + return updateCount, nil +} diff --git a/migrations/back_37/utils/utils.go b/migrations/20231128_jellyfish_migration/utils/utils.go similarity index 100% rename from migrations/back_37/utils/utils.go rename to migrations/20231128_jellyfish_migration/utils/utils.go diff --git a/migrations/back_37/utils/utils_suite_test.go b/migrations/20231128_jellyfish_migration/utils/utils_suite_test.go similarity index 100% rename from migrations/back_37/utils/utils_suite_test.go rename to migrations/20231128_jellyfish_migration/utils/utils_suite_test.go diff --git a/migrations/back_37/utils/utils_test.go b/migrations/20231128_jellyfish_migration/utils/utils_test.go similarity index 98% rename from migrations/back_37/utils/utils_test.go rename to migrations/20231128_jellyfish_migration/utils/utils_test.go index 5ab72ba02..deff2cbfc 100644 --- a/migrations/back_37/utils/utils_test.go +++ b/migrations/20231128_jellyfish_migration/utils/utils_test.go @@ -12,7 +12,7 @@ import ( "github.com/tidepool-org/platform/data/types/common" "github.com/tidepool-org/platform/data/types/settings/pump" pumpTest "github.com/tidepool-org/platform/data/types/settings/pump/test" - "github.com/tidepool-org/platform/migrations/back_37/utils" + "github.com/tidepool-org/platform/migrations/20231128_jellyfish_migration/utils" ) var _ = Describe("back-37", func() { diff --git a/migrations/back_37/back_37.go b/migrations/back_37/back_37.go deleted file mode 100644 index 4e0a2652b..000000000 --- a/migrations/back_37/back_37.go +++ /dev/null @@ -1,148 +0,0 @@ -package main - -import ( - "context" - "time" - - "github.com/urfave/cli" - "go.mongodb.org/mongo-driver/bson" - - "github.com/tidepool-org/platform/application" - "github.com/tidepool-org/platform/errors" - migrationMongo "github.com/tidepool-org/platform/migration/mongo" - "github.com/tidepool-org/platform/migrations/back_37/utils" - storeStructuredMongo "github.com/tidepool-org/platform/store/structured/mongo" -) - -func main() { - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - defer cancel() - application.RunAndExit(NewMigration(ctx)) -} - -type Migration struct { - ctx context.Context - *migrationMongo.Migration - dataRepository *storeStructuredMongo.Repository -} - -func NewMigration(ctx context.Context) *Migration { - return &Migration{ - ctx: ctx, - Migration: migrationMongo.NewMigration(), - } -} - -func (m *Migration) Initialize(provider application.Provider) error { - if err := m.Migration.Initialize(provider); err != nil { - return err - } - - m.CLI().Usage = "BACK-37: Migrate all existing data to add required Platform deduplication hash fields" - m.CLI().Description = "BACK-37: To fully migrate devices from the `jellyfish` upload API to the `platform` upload API" - m.CLI().Authors = []cli.Author{ - { - Name: "J H BATE", - Email: "jamie@tidepool.org", - }, - } - - m.CLI().Action = func(ctx *cli.Context) error { - if !m.ParseContext(ctx) { - return nil - } - return m.execute() - } - - return nil -} - -func (m *Migration) execute() error { - m.Logger().Debug("Migrate jellyfish API data") - m.Logger().Debug("Creating data store") - - mongoConfig := m.NewMongoConfig() - mongoConfig.Database = "data" - mongoConfig.Timeout = 60 * time.Minute - dataStore, err := storeStructuredMongo.NewStore(mongoConfig) - if err != nil { - return errors.Wrap(err, "unable to create data store") - } - defer dataStore.Terminate(m.ctx) - - m.Logger().Debug("Creating data repository") - m.dataRepository = dataStore.GetRepository("deviceData") - m.Logger().Info("Migration of jellyfish documents has begun") - hashUpdatedCount, errorCount := m.migrateJellyfishDocuments() - m.Logger().Infof("Migrated %d jellyfish documents", hashUpdatedCount) - m.Logger().Infof("%d errors occurred", errorCount) - return nil -} - -func (m *Migration) migrateJellyfishDocuments() (int, int) { - logger := m.Logger() - logger.Debug("Finding jellyfish data") - var hashUpdatedCount, errorCount int - selector := bson.M{ - // jellyfish uses a generated _id that is not an mongo objectId - "_id": bson.M{"$not": bson.M{"$type": "objectId"}}, - "_deduplicator": bson.M{"$exists": false}, - } - - var jellyfishResult bson.M - jellyfishDocCursor, err := m.dataRepository.Find(m.ctx, selector) - if err != nil { - logger.WithError(err).Error("Unable to find jellyfish data") - errorCount++ - return hashUpdatedCount, errorCount - } - defer jellyfishDocCursor.Close(m.ctx) - for jellyfishDocCursor.Next(m.ctx) { - err = jellyfishDocCursor.Decode(&jellyfishResult) - if err != nil { - logger.WithError(err).Error("Could not decode mongo doc") - errorCount++ - continue - } - if !m.DryRun() { - if updated, err := m.migrateDocument(jellyfishResult); err != nil { - logger.WithError(err).Errorf("Unable to migrate jellyfish document %s.", jellyfishResult["_id"]) - errorCount++ - continue - } else if updated { - hashUpdatedCount++ - } - } - } - if err := jellyfishDocCursor.Err(); err != nil { - logger.WithError(err).Error("Error while fetching data. Please re-run to complete the migration.") - errorCount++ - } - return hashUpdatedCount, errorCount -} - -func (m *Migration) migrateDocument(jfDatum bson.M) (bool, error) { - - datumID, err := utils.GetValidatedString(jfDatum, "_id") - if err != nil { - return false, err - } - - updates, err := utils.GetDatumUpdates(jfDatum) - if err != nil { - return false, err - } - - result, err := m.dataRepository.UpdateOne(m.ctx, bson.M{ - "_id": datumID, - "modifiedTime": jfDatum["modifiedTime"], - }, bson.M{ - "$set": updates, - }) - - if err != nil { - return false, err - } - return result.ModifiedCount == 1, nil -}