Skip to content
Merged
9 changes: 8 additions & 1 deletion pbm/backup/logical.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,36 +200,43 @@ func (b *Backup) doLogical(
return errors.Wrap(err, "generate archive meta v1")
}

l.Info("dump finished, waiting for the oplog")
l.Info("dump finished for RS: %s", b.brief.SetName)

err = ChangeRSState(b.leadConn, bcp.Name, rsMeta.Name, defs.StatusDumpDone, "")
if err != nil {
return errors.Wrap(err, "set shard's StatusDumpDone")
}

if inf.IsLeader() {
if b.brief.Sharded {
l.Info("checking status of dump on other nodes")
}
err := b.reconcileStatus(ctx, bcp.Name, opid.String(), defs.StatusDumpDone, nil)
if err != nil {
return errors.Wrap(err, "check cluster for dump done")
}
} else {
l.Info("waiting for leader to validate dump done on all nodes")
err = b.waitForStatus(ctx, bcp.Name, defs.StatusDumpDone, nil)
if err != nil {
return errors.Wrap(err, "waiting for dump done")
}
}

l.Info("stopping oplog slicer on this node")
lastSavedTS, oplogSize, err := stopOplogSlicer()
if err != nil {
return errors.Wrap(err, "oplog")
}

l.Info("setting last write timestamp on this node")
err = SetRSLastWrite(b.leadConn, bcp.Name, rsMeta.Name, lastSavedTS)
if err != nil {
return errors.Wrap(err, "set shard's last write ts")
}

if inf.IsLeader() {
l.Info("setting last common write timestamp across all nodes")
err = b.setClusterLastWrite(ctx, bcp.Name)
if err != nil {
return errors.Wrap(err, "set cluster last write ts")
Expand Down
Loading