Skip to content


initial rework for robust data migration
Browse files Browse the repository at this point in the history
  • Loading branch information
jh-bate committed Nov 28, 2023
1 parent 4bc6f48 commit 350e785
Show file tree
Hide file tree
Showing 5 changed files with 382 additions and 149 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,381 @@
package main

import (


migrationMongo ""

type Config struct {
uri string
minOplogWindow int
// these values are used to determine writes batches, first dividing the oplog's size with the desired duration and
// expected entry size, then adding a divisor to account for NOP overshoot in the oplog
expectedOplogEntrySize int
// how much of the oplog is NOP, this adjusts the batch to account for an oplog that is very change sensitive
// must be > 0
// prod 0.6
// idle 100
nopPercent int
// minimum free disk space percent
minFreePercent int
readBatchSize int64

type Migration struct {
ctx context.Context
config *Config
client *mongo.Client
oplogC *mongo.Collection
deviceDataC *mongo.Collection
writeBatchSize *int64
updates []mongo.WriteModel

const oplogName = ""

func main() {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

func NewMigration(ctx context.Context) *Migration {
return &Migration{
ctx: ctx,
Migration: migrationMongo.NewMigration(),
config: &Config{},
updates: []mongo.WriteModel{},

func (m *Migration) Initialize(provider application.Provider) error {
if err := m.Migration.Initialize(provider); err != nil {
return err

m.CLI().Usage = "BACK-37: Migrate all existing data to add required Platform deduplication hash fields"
m.CLI().Description = "BACK-37: To fully migrate devices from the `jellyfish` upload API to the `platform` upload API"
m.CLI().Authors = []cli.Author{
Name: "J H BATE",
Email: "[email protected]",
m.CLI().Flags = append(m.CLI().Flags,
Name: "batch-size",
Usage: "number of records to read each time",
Destination: &m.config.readBatchSize,
Value: 3000,
Required: false,
Name: "min-free-percent",
Usage: "minimum free disk space percent",
Destination: &m.config.minFreePercent,
Value: 10,
Required: false,
Name: "nop-percent",
Usage: "how much of the oplog is NOP",
Destination: &m.config.nopPercent,
Value: 100,
Required: false,
Name: "oplog-entry-size",
Usage: "minimum free disk space percent",
Destination: &m.config.expectedOplogEntrySize,
Value: 420,
Required: false,
Name: "oplog-window",
Usage: "minimum oplog window in seconds",
Destination: &m.config.minOplogWindow,
Value: 28800, // 8hrs
Required: false,
Name: "uri",
Usage: "mongo connection URI",
Destination: &m.config.uri,
Value: "mongodb://localhost:27017",
Required: false,

m.CLI().Action = func(ctx *cli.Context) error {
if !m.ParseContext(ctx) {
return nil
if err := m.prepare(); err != nil {
return nil
return m.execute()
return nil

func (m *Migration) prepare() error {
var err error
m.client, err = mongo.Connect(m.ctx, options.Client().ApplyURI(m.config.uri))
if err != nil {
return fmt.Errorf("unable to connect to MongoDB: %w", err)
defer m.client.Disconnect(m.ctx)

m.oplogC = m.client.Database("local").Collection(oplogName)
m.deviceDataC = m.client.Database("data").Collection("deviceData")

if err := m.checkFreeSpace(); err != nil {
return err

err = m.setWriteBatchSize()
if err != nil {
return err
return nil

func (m *Migration) execute() error {
totalMigrated := 0
for m.fetchAndUpdateBatch() {
updatedCount, err := m.writeBatchUpdates()
if err != nil {
m.Logger().WithError(err).Error("failed writing batch")
return err
totalMigrated = totalMigrated + updatedCount
m.Logger().Debugf("migrated %d for a total of %d migrated items", updatedCount, totalMigrated)
return nil

func (m *Migration) getOplogDuration() (time.Duration, error) {
type MongoMetaData struct {
Wall time.Time `json:"wall"`
if m.oplogC != nil {
var oldest MongoMetaData
if err := m.oplogC.FindOne(
m.ctx, bson.M{},
options.FindOne().SetProjection(bson.M{"wall": 1})).Decode(&oldest); err != nil {
return 0, err
var newest MongoMetaData
if err := m.oplogC.FindOne(m.ctx,
options.FindOne().SetProjection(bson.M{"wall": 1})).Decode(&newest); err != nil {
return 0, err
oplogDuration := oldest.Wall.Sub(oldest.Wall)
m.Logger().Debugf("oplog duration is currently: %v\n", oplogDuration)
return oplogDuration, nil
m.Logger().Debug("Not clustered, not retrieving oplog duration.")
oplogDuration := time.Duration(m.config.minOplogWindow+1) * time.Second
return oplogDuration, nil


func (m *Migration) setWriteBatchSize() error {
if m.oplogC != nil {
m.Logger().Debug("Getting oplog duration...")
type MongoMetaData struct {
MaxSize int `json:"maxSize"`
var metaData MongoMetaData
if err := m.oplogC.Database().RunCommand(m.ctx, bson.M{"collStats": oplogName}).Decode(&metaData); err != nil {
return err
writeBatchSize := int64(metaData.MaxSize / m.config.expectedOplogEntrySize / m.config.minOplogWindow / (m.config.nopPercent / 7))
m.writeBatchSize = &writeBatchSize
return nil
var writeBatchSize = int64(30000)
m.Logger().Debugf("MongoDB is not clustered, removing write batch limit, setting to %d documents.", writeBatchSize)
m.writeBatchSize = &writeBatchSize
return nil

func (m *Migration) checkFreeSpace() error {
type MongoMetaData struct {
FsTotalSize int `json:"fsTotalSize"`
FsUsedSize int `json:"fsUsedSize"`
var metaData MongoMetaData
m.Logger().Debug("Getting DB free space...")
err := m.deviceDataC.Database().RunCommand(m.ctx, bson.M{"dbStats": 1}).Decode(&metaData)
if err != nil {
return err
bytesFree := metaData.FsTotalSize - metaData.FsUsedSize
percentFree := int(math.Floor(float64(bytesFree) / float64(metaData.FsTotalSize) * 100))
m.Logger().Debugf("DB disk currently has %d%% (%d) free.", percentFree*100, bytesFree)

if percentFree > m.config.minFreePercent {
return fmt.Errorf("error %d%% is below minimum free space of %d%%", percentFree, m.config.minFreePercent)
return nil

func (m *Migration) getWaitTime() (float64, error) {
m.Logger().Debug("Loading DB replication status...")

type Member struct {
Name string `json:"name"`
Health int `json:"health"`
Uptime int `json:"uptime"`
State int `json:"state"`

type MongoMetaData struct {
Members []Member `json:"members"`

var metaData MongoMetaData
m.client.Database("admin").RunCommand(m.ctx, bson.M{"replSetGetStatus": 1}).Decode(&metaData)
m.Logger().Debug("DB replication status loaded.")

for _, member := range metaData.Members {
if member.State < 1 || member.State > 2 || member.Health != 1 || member.Uptime < 120 {
m.Logger().Debugf("DB member %s down or not ready.", member.Name)
return 240, nil

oplogDuration, err := m.getOplogDuration()
if err != nil {
return 0, err
if oplogDuration.Seconds() < float64(m.config.minOplogWindow) {
minOplogWindowTime := time.Duration(m.config.minOplogWindow) * time.Second
m.Logger().Debugf("DB OPLOG shorter than requested duration of %s, currently %s.", minOplogWindowTime, oplogDuration)
waitTime := float64(m.config.minOplogWindow) - oplogDuration.Seconds()
waitTime *= 1.15
if waitTime < 600 {
waitTime = 600
return waitTime, nil
return 0, nil

func (m *Migration) blockUntilDBReady() error {
waitTime, err := m.getWaitTime()
if err != nil {
return err
var totalWait float64
for waitTime > 0 {
totalWait += waitTime
if totalWait > 1800 {
m.Logger().Debugf("Long total wait of %s, possibly high load, or sustained DB outage. If neither, adjust NOP_PERCENT to reduce overshoot.", time.Duration(totalWait)*time.Second)
m.Logger().Debugf("Sleeping for %d", time.Duration(waitTime)*time.Second)
time.Sleep(time.Duration(waitTime) * time.Second)
waitTime, err = m.getWaitTime()
if err != nil {
return err
return nil

func (m *Migration) fetchAndUpdateBatch() bool {
selector := bson.M{
// jellyfish uses a generated _id that is not an mongo objectId
"_id": bson.M{"$not": bson.M{"$type": "objectId"}},
"_deduplicator": bson.M{"$exists": false},
m.updates = []mongo.WriteModel{}

dDataCursor, err := m.deviceDataC.Find(m.ctx, selector,
&options.FindOptions{Limit: &m.config.readBatchSize},
if err != nil {
m.Logger().WithError(err).Error("failed to select data")
return false

var dDataResult bson.M

defer dDataCursor.Close(m.ctx)
for dDataCursor.Next(m.ctx) {
err = dDataCursor.Decode(&dDataResult)
if err != nil {
m.Logger().WithError(err).Error("failed decoding data")
return false

datumID, err := utils.GetValidatedString(dDataResult, "_id")
if err != nil {
m.Logger().WithError(err).Error("failed getting dutum _id")
return false

updates, err := utils.GetDatumUpdates(dDataResult)
if err != nil {
m.Logger().WithError(err).Error("failed getting datum updates")
return false

m.updates = append(m.updates, mongo.NewUpdateOneModel().SetFilter(
"_id": datumID,
"modifiedTime": dDataResult["modifiedTime"],
"$set": updates,
return len(m.updates) > 0

func (m *Migration) writeBatchUpdates() (int, error) {
var getBatches = func(chunkSize int) [][]mongo.WriteModel {
batches := [][]mongo.WriteModel{}
for i := 0; i < len(m.updates); i += chunkSize {
end := i + chunkSize

if end > len(m.updates) {
end = len(m.updates)
batches = append(batches, m.updates[i:end])
return batches

updateCount := 0
for _, batch := range getBatches(int(*m.writeBatchSize)) {
if err := m.blockUntilDBReady(); err != nil {
return updateCount, err
if err := m.checkFreeSpace(); err != nil {
return updateCount, err
results, err := m.deviceDataC.BulkWrite(m.ctx, batch)
if err != nil {
return updateCount, err
updateCount = updateCount + int(results.ModifiedCount)
return updateCount, nil
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
pumpTest ""

var _ = Describe("back-37", func() {
Expand Down

0 comments on commit 350e785

Please sign in to comment.