Skip to content

Commit

Permalink
sort query and set last updated to use
Browse files Browse the repository at this point in the history
  • Loading branch information
jh-bate committed Dec 13, 2023
1 parent 36b6436 commit 793562a
Showing 1 changed file with 19 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Migration struct {
writeBatchSize *int64
updates []mongo.WriteModel
dryRun bool
lastUpdatedId string
}

const oplogName = "oplog.rs"
Expand Down Expand Up @@ -117,7 +118,7 @@ func (m *Migration) Initialize() error {
Name: "batch-size",
Usage: "number of records to read each time",
Destination: &m.config.readBatchSize,
Value: 30,
Value: 10,
Required: false,
},
cli.IntFlag{
Expand Down Expand Up @@ -182,7 +183,7 @@ func (m *Migration) prepare() error {

func (m *Migration) execute() error {
totalMigrated := 0
testingCapSize := 10
testingCapSize := 20
for m.fetchAndUpdateBatch() {
updatedCount, err := m.writeBatchUpdates()
if err != nil {
Expand Down Expand Up @@ -220,7 +221,7 @@ func (m *Migration) getOplogDuration() (time.Duration, error) {
return 0, err
}
oplogDuration := newest.Wall.Sub(oldest.Wall)
log.Printf("oplog duration: %v", oplogDuration)
log.Printf("current oplog duration: %v", oplogDuration)
return oplogDuration, nil
}
log.Println("Not clustered, not retrieving oplog duration.")
Expand Down Expand Up @@ -316,7 +317,6 @@ func (m *Migration) getWaitTime() (float64, error) {
}

func (m *Migration) blockUntilDBReady() error {
log.Println("blocking until ready...")
waitTime, err := m.getWaitTime()
if err != nil {
return err
Expand Down Expand Up @@ -346,12 +346,25 @@ func (m *Migration) fetchAndUpdateBatch() bool {
// testing based on _userId for [email protected]
"_userId": "5e8cac61-6bef-4728-b490-c1d82087ed9c",
}

if m.lastUpdatedId != "" {
selector["_id"] = bson.M{"$and": []interface{}{
bson.M{"$gt": m.lastUpdatedId},
bson.M{"$type": "objectId"},
}}
log.Printf("selector with _id $gt %#v", selector)
}

m.updates = []mongo.WriteModel{}
sLimit := int64(5) //Testing only get small group at a time

if dataC := m.getDataCollection(); dataC != nil {
dDataCursor, err := dataC.Find(m.ctx, selector,
//&options.FindOptions{Limit: &m.config.readBatchSize},
&options.FindOptions{Limit: &sLimit},
&options.FindOptions{
Limit: &sLimit,
Sort: bson.M{"_id": 1},
},
)
if err != nil {
log.Printf("failed to select data: %s", err)
Expand Down Expand Up @@ -387,6 +400,7 @@ func (m *Migration) fetchAndUpdateBatch() bool {
}).SetUpdate(bson.M{
"$set": updates,
}))
m.lastUpdatedId = datumID
}
return len(m.updates) > 0
}
Expand Down

0 comments on commit 793562a

Please sign in to comment.