Skip to content

Commit

Permalink
BACK-1758] This commit removes dual reading and writing from the devi…
Browse files Browse the repository at this point in the history
…ceDataSets and (#655)

deviceData collection and instead does a single collection read and write for
datum (colleciton deviceData) and uploads (collection deviceDataSets). This
is intended to be done AFTER migration of all type="upload" from the
deviceData to the deviceDataSets collection.
  • Loading branch information
lostlevels authored Aug 23, 2023
1 parent 046ff99 commit b6209eb
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 424 deletions.
286 changes: 14 additions & 272 deletions data/store/mongo/mongo_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,13 @@ import (

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readconcern"
"go.mongodb.org/mongo-driver/mongo/writeconcern"

"github.com/tidepool-org/platform/data"
"github.com/tidepool-org/platform/data/store"
"github.com/tidepool-org/platform/data/types/upload"
"github.com/tidepool-org/platform/errors"
"github.com/tidepool-org/platform/log"
"github.com/tidepool-org/platform/page"
storeStructuredMongo "github.com/tidepool-org/platform/store/structured/mongo"
structureValidator "github.com/tidepool-org/platform/structure/validator"
)

// DataRepository implements the platform/data/store.DataRepository inteface.
Expand All @@ -37,160 +32,11 @@ func (d *DataRepository) EnsureIndexes() error {
}

func (d *DataRepository) GetDataSetsForUserByID(ctx context.Context, userID string, filter *store.Filter, pagination *page.Pagination) ([]*upload.Upload, error) {
// Try reading from both new and old collections that hold dataSets,
// starting with the new one. Can read only from the new deviceDataSets
// collection via DataSetRepository when migration completed.
newUploads, err := d.getDataSetsForUserByID(ctx, d.DataSetRepository.Repository, userID, filter, pagination)
if err != nil {
return nil, err
}

// Read from old deviceData collection for Uploads. Can delete this code
// when migration is complete.
prevUploads, err := d.getDataSetsForUserByID(ctx, d.DatumRepository.Repository, userID, filter, pagination)
if err != nil {
return nil, err
}

// Because there may be some dataSets in the old deviceData collection we
// must read from both and merge the results while migration isn't
// complete. Can delete this code when migration is complete.
merged := MergeSortedUploads(newUploads, prevUploads)
if pagination != nil && len(merged) > pagination.Size {
merged = merged[:pagination.Size]
}
return merged, nil
}

func (d *DataRepository) getDataSetsForUserByID(ctx context.Context, repo *storeStructuredMongo.Repository, userID string, filter *store.Filter, pagination *page.Pagination) ([]*upload.Upload, error) {
if ctx == nil {
return nil, errors.New("context is missing")
}
if userID == "" {
return nil, errors.New("user id is missing")
}
if filter == nil {
filter = store.NewFilter()
} else if err := structureValidator.New().Validate(filter); err != nil {
return nil, errors.Wrap(err, "filter is invalid")
}
if pagination == nil {
pagination = page.NewPagination()
} else if err := structureValidator.New().Validate(pagination); err != nil {
return nil, errors.Wrap(err, "pagination is invalid")
}

now := time.Now()

var dataSets []*upload.Upload
selector := bson.M{
"_active": true,
"_userId": userID,
"type": "upload",
}
if !filter.Deleted {
selector["deletedTime"] = bson.M{"$exists": false}
}
opts := storeStructuredMongo.FindWithPagination(pagination).
SetSort(bson.M{"createdTime": -1})
cursor, err := repo.Find(ctx, selector, opts)

loggerFields := log.Fields{"userId": userID, "dataSetsCount": len(dataSets), "duration": time.Since(now) / time.Microsecond}
log.LoggerFromContext(ctx).WithFields(loggerFields).WithError(err).Debug("getDataSetsForUserByID")

if err != nil {
return nil, errors.Wrap(err, "unable to get data sets for user by id")
}

if err = cursor.All(ctx, &dataSets); err != nil {
return nil, errors.Wrap(err, "unable to decode data sets for user by id")
}

if dataSets == nil {
dataSets = []*upload.Upload{}
}
return dataSets, nil
return d.DataSetRepository.GetDataSetsForUserByID(ctx, userID, filter, pagination)
}

func (d *DataRepository) ListUserDataSets(ctx context.Context, userID string, filter *data.DataSetFilter, pagination *page.Pagination) (data.DataSets, error) {
// Try reading from both new and old collections that hold dataSets,
// starting with the new one. Can read only from the new deviceDataSets
// collection via DataSetRepository when migration completed.
newDataSets, err := d.listUserDataSets(ctx, d.DataSetRepository.Repository, userID, filter, pagination)
if err != nil {
return nil, err
}

// Read from old deviceData collection for DataSets. Can delete this code
// when migration is complete.
prevDataSets, err := d.listUserDataSets(ctx, d.DatumRepository.Repository, userID, filter, pagination)
if err != nil {
return nil, err
}

// Because there may be some dataSets in the old deviceData collection we
// must read from both and merge the results while migration isn't
// complete. Can delete this code when migration is complete.
merged := MergeSortedDataSets(newDataSets, prevDataSets)
if pagination != nil && len(merged) > pagination.Size {
merged = merged[:pagination.Size]
}
return merged, nil
}

func (d *DataRepository) listUserDataSets(ctx context.Context, repo *storeStructuredMongo.Repository, userID string, filter *data.DataSetFilter, pagination *page.Pagination) (data.DataSets, error) {
if ctx == nil {
return nil, errors.New("context is missing")
}
if userID == "" {
return nil, errors.New("user id is missing")
}
if filter == nil {
filter = data.NewDataSetFilter()
} else if err := structureValidator.New().Validate(filter); err != nil {
return nil, errors.Wrap(err, "filter is invalid")
}
if pagination == nil {
pagination = page.NewPagination()
} else if err := structureValidator.New().Validate(pagination); err != nil {
return nil, errors.Wrap(err, "pagination is invalid")
}

now := time.Now()
logger := log.LoggerFromContext(ctx).WithFields(log.Fields{"userId": userID, "filter": filter, "pagination": pagination})

dataSets := data.DataSets{}
selector := bson.M{
"_active": true,
"_userId": userID,
"type": "upload",
}
if filter.ClientName != nil {
selector["client.name"] = *filter.ClientName
}
if filter.Deleted == nil || !*filter.Deleted {
selector["deletedTime"] = bson.M{"$exists": false}
}
if filter.DeviceID != nil {
selector["deviceId"] = *filter.DeviceID
}
opts := storeStructuredMongo.FindWithPagination(pagination).
SetSort(bson.M{"createdTime": -1})
cursor, err := repo.Find(ctx, selector, opts)
logger.WithFields(log.Fields{"count": len(dataSets), "duration": time.Since(now) / time.Microsecond}).WithError(err).Debug("ListUserDataSets")
if err != nil {
return nil, errors.Wrap(err, "unable to list user data sets")
}

if err = cursor.All(ctx, &dataSets); err != nil {
return nil, errors.Wrap(err, "unable to decode user data sets")
}

if dataSets == nil {
dataSets = data.DataSets{}
}

return dataSets, nil
return d.DataSetRepository.ListUserDataSets(ctx, userID, filter, pagination)
}

func (d *DataRepository) GetDataSet(ctx context.Context, id string) (*data.DataSet, error) {
Expand All @@ -207,78 +53,19 @@ func (d *DataRepository) GetDataSet(ctx context.Context, id string) (*data.DataS
}

func (d *DataRepository) GetDataSetByID(ctx context.Context, dataSetID string) (*upload.Upload, error) {
// Try reading from both new and old collections that hold dataSets, starting with the new one.
// Can read only from the new deviceDataSets collection via DataSetRepository when migration completed.
dataSet, err := d.DataSetRepository.GetDataSetByID(ctx, dataSetID)
if err != nil {
return nil, err
}
if dataSet != nil {
return dataSet, nil
}

return d.DatumRepository.GetDataSetByID(ctx, dataSetID)
return d.DataSetRepository.GetDataSetByID(ctx, dataSetID)
}

func (d *DataRepository) CreateDataSet(ctx context.Context, dataSet *upload.Upload) error {
// Until everything is migrated over to the new collection, some old
// clients may still be reading from the old collection so we must write
// to both old and new collection.
steps := func(sessCtx mongo.SessionContext) (interface{}, error) {
now := time.Now().UTC()
if err := d.DatumRepository.createDataSet(sessCtx, dataSet, now); err != nil {
return nil, err
}
return nil, d.DataSetRepository.createDataSet(sessCtx, dataSet, now)
}

_, err := d.transact(ctx, steps)
return err
}

func (d *DataRepository) transact(ctx context.Context, steps func(sessCtx mongo.SessionContext) (interface{}, error)) (interface{}, error) {
sess, err := d.mongoClient().StartSession()
if err != nil {
return nil, err
}
defer sess.EndSession(ctx)

wc := writeconcern.New(writeconcern.WMajority(), writeconcern.J(true))
rc := readconcern.Majority()
txOpts := options.Transaction().SetWriteConcern(wc).SetReadConcern(rc)

return sess.WithTransaction(ctx, steps, txOpts)

return d.DataSetRepository.createDataSet(ctx, dataSet, time.Now().UTC())
}

func (d *DataRepository) UpdateDataSet(ctx context.Context, id string, update *data.DataSetUpdate) (*upload.Upload, error) {
if ctx == nil {
return nil, errors.New("context is missing")
}

steps := func(sessCtx mongo.SessionContext) (interface{}, error) {
now := time.Now().UTC()
if doc, err := d.DatumRepository.updateDataSet(sessCtx, id, update, now); err != nil {
return nil, err
} else if doc == nil {
// if document doesn't exist in the old collection, then it
// shouldn't exist in the new one either. Once migration is
// complete, can delete this checking code and just use the
// DataSetRepository.upsertDataSet (but changing it to be named
// UpdateDataSet with no upsert option ) by itself.
return nil, nil
}
return d.DataSetRepository.upsertDataSet(sessCtx, id, update, now)
}

dataSet, err := d.transact(ctx, steps)
if err != nil {
return nil, err
}
if dataSet == nil {
return nil, nil
}
return dataSet.(*upload.Upload), nil
return d.DataSetRepository.updateDataSet(ctx, id, update, time.Now().UTC())
}

// DeleteDataSet will actually delete all non upload data and not actually
Expand All @@ -297,8 +84,7 @@ func (d *DataRepository) DeleteDataSet(ctx context.Context, dataSet *upload.Uplo

var err error
var removeInfo *mongo.DeleteResult
var updateInfoDeviceData *mongo.UpdateResult // updating of DataSets in the old deviceData collection
var updateInfoDeviceDataSet *mongo.UpdateResult // updating of DataSets in the new deviceDataSets collection
var updateInfo *mongo.UpdateResult // updating of DataSets in the new deviceDataSets collection

selector := bson.M{
"_userId": dataSet.UserID,
Expand All @@ -319,25 +105,11 @@ func (d *DataRepository) DeleteDataSet(ctx context.Context, dataSet *upload.Uplo
"modifiedTime": timestamp,
}
unset := bson.M{}

var sessErr error
steps := func(sessCtx mongo.SessionContext) (interface{}, error) {
updateInfoDeviceDataSet, sessErr = d.DataSetRepository.UpdateMany(sessCtx, selector, d.DataSetRepository.ConstructUpdate(set, unset))
if sessErr != nil {
return nil, sessErr
}

updateInfoDeviceData, sessErr = d.DatumRepository.UpdateMany(sessCtx, selector, d.DataSetRepository.ConstructUpdate(set, unset))
if sessErr != nil {
return nil, sessErr
}
return nil, nil
}

_, err = d.transact(ctx, steps)
// Note setting updateInfo and err as defined above
updateInfo, err = d.DataSetRepository.UpdateMany(ctx, selector, d.DataSetRepository.ConstructUpdate(set, unset))
}

loggerFields := log.Fields{"dataSetId": dataSet.UploadID, "removeInfo": removeInfo, "updateInfoDeviceData": updateInfoDeviceData, "updateInfoDeviceDataSet": updateInfoDeviceDataSet, "duration": time.Since(now) / time.Microsecond}
loggerFields := log.Fields{"dataSetId": dataSet.UploadID, "removeInfo": removeInfo, "updateInfo": updateInfo, "duration": time.Since(now) / time.Microsecond}
log.LoggerFromContext(ctx).WithFields(loggerFields).WithError(err).Debug("DeleteDataSet")

if err != nil {
Expand Down Expand Up @@ -366,8 +138,7 @@ func (d *DataRepository) DeleteOtherDataSetData(ctx context.Context, dataSet *up

var err error
var removeInfo *mongo.DeleteResult
var updateInfoDeviceData *mongo.UpdateResult
var updateInfoDeviceDataSet *mongo.UpdateResult
var updateInfo *mongo.UpdateResult

selector := bson.M{
"_userId": dataSet.UserID,
Expand All @@ -390,23 +161,10 @@ func (d *DataRepository) DeleteOtherDataSetData(ctx context.Context, dataSet *up
"deletedTime": timestamp,
}
unset := bson.M{}

var sessErr error
steps := func(sessCtx mongo.SessionContext) (interface{}, error) {
updateInfoDeviceDataSet, sessErr = d.DataSetRepository.UpdateMany(sessCtx, selector, d.DataSetRepository.ConstructUpdate(set, unset))
if sessErr != nil {
return nil, sessErr
}
updateInfoDeviceData, sessErr = d.DatumRepository.UpdateMany(sessCtx, selector, d.DataSetRepository.ConstructUpdate(set, unset))
if sessErr != nil {
return nil, sessErr
}
return nil, nil
}
_, err = d.transact(ctx, steps)
updateInfo, err = d.DataSetRepository.UpdateMany(ctx, selector, d.DataSetRepository.ConstructUpdate(set, unset))
}

loggerFields := log.Fields{"dataSetId": dataSet.UploadID, "removeInfo": removeInfo, "updateInfoDeviceData": updateInfoDeviceData, "updateInfoDeviceDataSet": updateInfoDeviceDataSet, "duration": time.Since(now) / time.Microsecond}
loggerFields := log.Fields{"dataSetId": dataSet.UploadID, "removeInfo": removeInfo, "updateInfo": updateInfo, "duration": time.Since(now) / time.Microsecond}
log.LoggerFromContext(ctx).WithFields(loggerFields).WithError(err).Debug("DeleteOtherDataSetData")

if err != nil {
Expand All @@ -430,29 +188,13 @@ func (d *DataRepository) DestroyDataForUserByID(ctx context.Context, userID stri
}
var removeDatumInfo *mongo.DeleteResult
var removeDeviceDataSetInfo *mongo.DeleteResult
var removeDataSetInfo *mongo.DeleteResult
var err error

removeDatumInfo, err = d.DatumRepository.DeleteMany(ctx, selector)
if err == nil {
var sessErr error
steps := func(sessCtx mongo.SessionContext) (interface{}, error) {
removeDeviceDataSetInfo, sessErr = d.DataSetRepository.DeleteMany(sessCtx, selector)
if sessErr != nil {
return nil, sessErr
}

removeDataSetInfo, sessErr = d.DatumRepository.DeleteMany(sessCtx, selector)
if sessErr != nil {
return nil, sessErr
}

return nil, nil
}

_, err = d.transact(ctx, steps)
removeDeviceDataSetInfo, err = d.DataSetRepository.DeleteMany(ctx, selector)
}
loggerFields := log.Fields{"userId": userID, "removeDatumInfo": removeDatumInfo, "removeDataSetInfo": removeDataSetInfo, "removeDeviceDataSetInfo": removeDeviceDataSetInfo, "duration": time.Since(now) / time.Microsecond}
loggerFields := log.Fields{"userId": userID, "removeDatumInfo": removeDatumInfo, "removeDeviceDataSetInfo": removeDeviceDataSetInfo, "duration": time.Since(now) / time.Microsecond}
log.LoggerFromContext(ctx).WithFields(loggerFields).WithError(err).Debug("DestroyDataForUserByID")

if err != nil {
Expand Down
Loading

0 comments on commit b6209eb

Please sign in to comment.