Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACK-2702] Add reason for setting last updated and outdated since #673

Merged
merged 28 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
7c4a31b
Add reason for setting last updated and outdated since
toddkazakov Oct 19, 2023
c42af18
add more specific conditions for SetOutdated triggers
Roukoswarf Oct 20, 2023
376b05b
fix some unit tests
Roukoswarf Oct 20, 2023
c267c7f
update to ginkgo v2 and add some unit tests
Roukoswarf Oct 24, 2023
c535a59
fix build and unit tests, still incomplete testing
Roukoswarf Oct 24, 2023
dcbe1fc
fix formatting
Roukoswarf Oct 24, 2023
99d13a5
possibly fix make test
Roukoswarf Oct 24, 2023
3b67506
more test fixes
Roukoswarf Oct 24, 2023
c554a3b
travis use go 1.21 for real
Roukoswarf Oct 24, 2023
99611b2
update vendor
Roukoswarf Oct 24, 2023
8fa2e8d
more unit tests
Roukoswarf Oct 25, 2023
15dc174
more unit tests
Roukoswarf Oct 25, 2023
f407953
formatting
Roukoswarf Oct 25, 2023
4c70f63
add outdatedSinceLimit and minor bugfixes
Roukoswarf Oct 26, 2023
b5e7ec5
zero out OutdatedSinceLimit on update
Roukoswarf Oct 26, 2023
020518c
formatting
Roukoswarf Oct 26, 2023
325b5f8
update deps
Roukoswarf Oct 26, 2023
68d2f2e
cleanup for review
Roukoswarf Oct 30, 2023
be86c05
update deps
Roukoswarf Oct 30, 2023
ebded89
remove direct depend on shopify/sarama
Roukoswarf Oct 30, 2023
620a678
remove dep on github.com/pkg/errors
Roukoswarf Oct 30, 2023
d0a8aaa
remove ginkgo v1
Roukoswarf Oct 30, 2023
ce1820d
address review comments
Roukoswarf Oct 30, 2023
b7cd5cf
update unit test to handle mongodb time rounding
Roukoswarf Nov 1, 2023
bcf85e7
move DataSetsDataCreate summary logic after deduplicator
Roukoswarf Nov 1, 2023
b7b61b7
add schemamigration reason
Roukoswarf Nov 2, 2023
599f396
dont clear LastUpdatedReason on SetOutdated
Roukoswarf Nov 3, 2023
adb3cc5
Merge remote-tracking branch 'origin/master' into ehr-triggers-spike
Roukoswarf Nov 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data/datum.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Datum interface {
GetPayload() *metadata.Metadata

GetType() string
GetActive() bool
Roukoswarf marked this conversation as resolved.
Show resolved Hide resolved
GetTime() *time.Time

SetUserID(userID *string)
Expand Down
10 changes: 7 additions & 3 deletions data/service/api/v1/datasets_data_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,13 @@ func CheckDatumUpdatesSummary(updatesSummary map[string]struct{}, datum data.Dat
twoYearsPast := time.Now().UTC().AddDate(0, -24, 0)
oneDayFuture := time.Now().UTC().AddDate(0, 0, 1)

for _, typ := range types.DeviceDataTypes {
if datum.GetType() == typ && datum.GetTime().Before(oneDayFuture) && datum.GetTime().After(twoYearsPast) {
updatesSummary[types.DeviceDataToSummaryTypes[typ]] = struct{}{}
// we only update summaries if the data is both of a relevant type, and being uploaded as "active"
// it also must be recent enough, within the past 2 years, and no more than 1d into the future
if datum.GetActive() {
for _, typ := range types.DeviceDataTypes {
Roukoswarf marked this conversation as resolved.
Show resolved Hide resolved
if datum.GetType() == typ && datum.GetTime().Before(oneDayFuture) && datum.GetTime().After(twoYearsPast) {
updatesSummary[types.DeviceDataToSummaryTypes[typ]] = struct{}{}
}
}
}
}
24 changes: 18 additions & 6 deletions data/service/api/v1/datasets_update.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package v1

import (
"github.com/tidepool-org/platform/data/summary/types"
"context"
"net/http"

"github.com/tidepool-org/platform/data"
dataService "github.com/tidepool-org/platform/data/service"
"github.com/tidepool-org/platform/data/store"
"github.com/tidepool-org/platform/data/summary/types"
"github.com/tidepool-org/platform/data/types/upload"
"github.com/tidepool-org/platform/log"
"github.com/tidepool-org/platform/permission"
Expand Down Expand Up @@ -91,17 +93,27 @@ func DataSetsUpdate(dataServiceContext dataService.Context) {
dataServiceContext.RespondWithInternalServerFailure("Unable to close", err)
return
}
}

all := map[string]struct{}{
types.SummaryTypeBGM: {},
types.SummaryTypeCGM: {},
updatesSummary := make(map[string]struct{})
CheckDataSetUpdatesSummary(ctx, dataServiceContext.DataRepository(), updatesSummary, dataSetID)
MaybeUpdateSummary(ctx, dataServiceContext.SummarizerRegistry(), updatesSummary, *dataSet.UserID, types.OutdatedReasonUploadCompleted)
}
MaybeUpdateSummary(ctx, dataServiceContext.SummarizerRegistry(), all, *dataSet.UserID, types.OutdatedReasonUploadCompleted)

if err = dataServiceContext.MetricClient().RecordMetric(ctx, "data_sets_update"); err != nil {
lgr.WithError(err).Error("Unable to record metric")
}

dataServiceContext.RespondWithStatusAndData(http.StatusOK, dataSet)
}

func CheckDataSetUpdatesSummary(ctx context.Context, repository store.DataRepository, updatesSummary map[string]struct{}, dataSetID string) {
Roukoswarf marked this conversation as resolved.
Show resolved Hide resolved
for _, typ := range types.DeviceDataTypes {
status, err := repository.CheckDataSetContainsType(ctx, dataSetID, typ)
if err != nil {
return
}
if status {
updatesSummary[types.DeviceDataToSummaryTypes[typ]] = struct{}{}
}
}
}
72 changes: 49 additions & 23 deletions data/store/mongo/mongo_datum.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ package mongo

import (
"context"
"fmt"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"

"errors"

"github.com/tidepool-org/platform/data"
"github.com/tidepool-org/platform/data/summary/types"
baseDatum "github.com/tidepool-org/platform/data/types"
"github.com/tidepool-org/platform/data/types/upload"
"github.com/tidepool-org/platform/errors"
"github.com/tidepool-org/platform/log"
storeStructuredMongo "github.com/tidepool-org/platform/store/structured/mongo"
structureValidator "github.com/tidepool-org/platform/structure/validator"
Expand Down Expand Up @@ -142,7 +144,7 @@ func (d *DatumRepository) CreateDataSetData(ctx context.Context, dataSet *upload
log.LoggerFromContext(ctx).WithFields(loggerFields).WithError(err).Debug("CreateDataSetData")

if err != nil {
return errors.Wrap(err, "unable to create data set data")
return fmt.Errorf("unable to create data set data: %w", err)
}
return nil
}
Expand Down Expand Up @@ -180,7 +182,7 @@ func (d *DatumRepository) ActivateDataSetData(ctx context.Context, dataSet *uplo
changeInfo, err := d.UpdateMany(ctx, selector, d.ConstructUpdate(set, unset))
if err != nil {
logger.WithError(err).Error("Unable to activate data set data")
return errors.Wrap(err, "unable to activate data set data")
return fmt.Errorf("unable to activate data set data: %w", err)
}

logger.WithFields(log.Fields{"changeInfo": changeInfo, "duration": time.Since(now) / time.Microsecond}).Debug("ActivateDataSetData")
Expand Down Expand Up @@ -220,7 +222,7 @@ func (d *DatumRepository) ArchiveDataSetData(ctx context.Context, dataSet *uploa
changeInfo, err := d.UpdateMany(ctx, selector, d.ConstructUpdate(set, unset))
if err != nil {
logger.WithError(err).Error("Unable to archive data set data")
return errors.Wrap(err, "unable to archive data set data")
return fmt.Errorf("unable to archive data set data: %w", err)
}

logger.WithFields(log.Fields{"changeInfo": changeInfo, "duration": time.Since(now) / time.Microsecond}).Debug("ArchiveDataSetData")
Expand Down Expand Up @@ -261,7 +263,7 @@ func (d *DatumRepository) DeleteDataSetData(ctx context.Context, dataSet *upload
changeInfo, err := d.UpdateMany(ctx, selector, d.ConstructUpdate(set, unset))
if err != nil {
logger.WithError(err).Error("Unable to delete data set data")
return errors.Wrap(err, "unable to delete data set data")
return fmt.Errorf("unable to delete data set data: %w", err)
}

logger.WithFields(log.Fields{"changeInfo": changeInfo, "duration": time.Since(now) / time.Microsecond}).Debug("DeleteDataSetData")
Expand Down Expand Up @@ -290,7 +292,7 @@ func (d *DatumRepository) DestroyDeletedDataSetData(ctx context.Context, dataSet
changeInfo, err := d.DeleteMany(ctx, selector)
if err != nil {
logger.WithError(err).Error("Unable to destroy deleted data set data")
return errors.Wrap(err, "unable to destroy deleted data set data")
return fmt.Errorf("unable to destroy deleted data set data: %w", err)
}

logger.WithFields(log.Fields{"changeInfo": changeInfo, "duration": time.Since(now) / time.Microsecond}).Debug("DestroyDeletedDataSetData")
Expand Down Expand Up @@ -318,7 +320,7 @@ func (d *DatumRepository) DestroyDataSetData(ctx context.Context, dataSet *uploa
changeInfo, err := d.DeleteMany(ctx, selector)
if err != nil {
logger.WithError(err).Error("Unable to destroy data set data")
return errors.Wrap(err, "unable to destroy data set data")
return fmt.Errorf("unable to destroy data set data: %w", err)
}

logger.WithFields(log.Fields{"changeInfo": changeInfo, "duration": time.Since(now) / time.Microsecond}).Debug("DestroyDataSetData")
Expand Down Expand Up @@ -370,7 +372,7 @@ func (d *DatumRepository) ArchiveDeviceDataUsingHashesFromDataSet(ctx context.Co
log.LoggerFromContext(ctx).WithFields(loggerFields).WithError(err).Debug("ArchiveDeviceDataUsingHashesFromDataSet")

if err != nil {
return errors.Wrap(err, "unable to archive device data using hashes from data set")
return fmt.Errorf("unable to archive device data using hashes from data set: %w", err)
}
return nil
}
Expand Down Expand Up @@ -426,7 +428,7 @@ func (d *DatumRepository) UnarchiveDeviceDataUsingHashesFromDataSet(ctx context.
loggerFields := log.Fields{"dataSetId": dataSet.UploadID, "result": result}
log.LoggerFromContext(ctx).WithFields(loggerFields).WithError(err).Error("Unable to decode result for UnarchiveDeviceDataUsingHashesFromDataSet")
if overallErr == nil {
overallErr = errors.Wrap(err, "unable to decode device data results")
overallErr = fmt.Errorf("unable to decode device data results: %w", err)
}
}
if result.ID.Active != (result.ID.ArchivedDataSetID == "") || result.ID.Active != (result.ID.ArchivedTime.IsZero()) {
Expand Down Expand Up @@ -458,7 +460,7 @@ func (d *DatumRepository) UnarchiveDeviceDataUsingHashesFromDataSet(ctx context.
loggerFields := log.Fields{"dataSetId": dataSet.UploadID, "result": result}
log.LoggerFromContext(ctx).WithFields(loggerFields).WithError(err).Error("Unable to update result for UnarchiveDeviceDataUsingHashesFromDataSet")
if overallErr == nil {
overallErr = errors.Wrap(err, "unable to transfer device data active")
overallErr = fmt.Errorf("unable to transfer device data active: %w", err)
}
} else {
overallUpdateInfo.ModifiedCount += updateInfo.ModifiedCount
Expand All @@ -467,7 +469,7 @@ func (d *DatumRepository) UnarchiveDeviceDataUsingHashesFromDataSet(ctx context.

if err := cursor.Err(); err != nil {
if overallErr == nil {
overallErr = errors.Wrap(err, "unable to iterate to transfer device data active")
overallErr = fmt.Errorf("unable to iterate to transfer device data active: %w", err)
}
}

Expand Down Expand Up @@ -496,10 +498,10 @@ func (d *DatumRepository) GetDataSet(ctx context.Context, id string) (*data.Data

err := d.FindOne(ctx, selector).Decode(&dataSet)
logger.WithField("duration", time.Since(now)/time.Microsecond).WithError(err).Debug("DatumRepository.GetDataSet")
if err == mongo.ErrNoDocuments {
if errors.Is(err, mongo.ErrNoDocuments) {
return nil, nil
} else if err != nil {
return nil, errors.Wrap(err, "unable to get data set")
return nil, fmt.Errorf("unable to get data set: %w", err)
}

return dataSet, nil
Expand All @@ -509,7 +511,7 @@ func validateAndTranslateSelectors(selectors *data.Selectors) (bson.M, error) {
if selectors == nil {
return bson.M{}, nil
} else if err := structureValidator.New().Validate(selectors); err != nil {
return nil, errors.Wrap(err, "selectors is invalid")
return nil, fmt.Errorf("selectors is invalid: %w", err)
}

var selectorIDs []string
Expand Down Expand Up @@ -543,6 +545,30 @@ func validateAndTranslateSelectors(selectors *data.Selectors) (bson.M, error) {
return selector, nil
}

func (d *DatumRepository) CheckDataSetContainsType(ctx context.Context, dataSetID string, typ string) (bool, error) {
twoYearsPast := time.Now().UTC().AddDate(0, -24, 0)
oneDayFuture := time.Now().UTC().AddDate(0, 0, 1)

selector := bson.M{
"_active": true,
"uploadId": dataSetID,
"type": typ,
"time": bson.M{"$gt": twoYearsPast,
Roukoswarf marked this conversation as resolved.
Show resolved Hide resolved
"$lte": oneDayFuture},
}

var result bson.M
err := d.FindOne(ctx, selector).Decode(result)
if err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
return false, nil
}
return false, fmt.Errorf("unable to check for type %s in dataset %s: %w", typ, dataSetID, err)
}

return true, nil
}

// GetDataRange be careful when calling this, as if dataRecords isn't a pointer underneath, it will silently not
// result in any results being returned.
func (d *DatumRepository) GetDataRange(ctx context.Context, dataRecords interface{}, userId string, typ string, startTime time.Time, endTime time.Time) error {
Expand All @@ -554,12 +580,12 @@ func (d *DatumRepository) GetDataRange(ctx context.Context, dataRecords interfac

// return error if ranges are inverted, as this can produce unexpected results
if startTime.After(endTime) {
return errors.Newf("startTime (%s) after endTime (%s) for user %s", startTime, endTime, userId)
return fmt.Errorf("startTime (%s) after endTime (%s) for user %s", startTime, endTime, userId)
}

// This is never expected to by an upload.
if isTypeUpload(typ) {
return errors.Newf("unexpected type: %v", upload.Type)
return fmt.Errorf("unexpected type: %v", upload.Type)
}

selector := bson.M{
Expand All @@ -575,11 +601,11 @@ func (d *DatumRepository) GetDataRange(ctx context.Context, dataRecords interfac

cursor, err := d.Find(ctx, selector, opts)
if err != nil {
return errors.Wrap(err, "unable to get cgm data in date range for user")
return fmt.Errorf("unable to get cgm data in date range for user: %w", err)
}

if err = cursor.All(ctx, dataRecords); err != nil {
return errors.Wrap(err, "unable to decode data sets")
return fmt.Errorf("unable to decode data sets, %w", err)
}

return nil
Expand All @@ -601,7 +627,7 @@ func (d *DatumRepository) GetLastUpdatedForUser(ctx context.Context, id string,

// This is never expected to by an upload.
if isTypeUpload(typ) {
return nil, errors.Newf("unexpected type: %v", upload.Type)
return nil, fmt.Errorf("unexpected type: %v", upload.Type)
}

futureCutoff := time.Now().AddDate(0, 0, 1).UTC()
Expand All @@ -621,11 +647,11 @@ func (d *DatumRepository) GetLastUpdatedForUser(ctx context.Context, id string,

cursor, err = d.Find(ctx, selector, findOptions)
if err != nil {
return nil, errors.Wrapf(err, "unable to get last %s date", typ)
return nil, fmt.Errorf("unable to get last %s date: %w", typ, err)
}

if err = cursor.All(ctx, &dataSet); err != nil {
return nil, errors.Wrapf(err, "unable to decode last %s date", typ)
return nil, fmt.Errorf("unable to decode last %s date: %w", typ, err)
}

// if we have no record
Expand All @@ -652,7 +678,7 @@ func (d *DatumRepository) DistinctUserIDs(ctx context.Context, typ string) ([]st

// This is never expected to by an upload.
if isTypeUpload(typ) {
return nil, errors.Newf("unexpected type: %v", upload.Type)
return nil, fmt.Errorf("unexpected type: %v", upload.Type)
}

// allow for a small margin on the pastCutoff to allow for calculation delay
Expand All @@ -668,7 +694,7 @@ func (d *DatumRepository) DistinctUserIDs(ctx context.Context, typ string) ([]st

result, err := d.Distinct(ctx, "_userId", selector)
if err != nil {
return nil, errors.Wrap(err, "error fetching distinct userIDs")
return nil, fmt.Errorf("error fetching distinct userIDs: %w", err)
}

for _, v := range result {
Expand Down
2 changes: 2 additions & 0 deletions data/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type DatumRepository interface {
GetDataRange(ctx context.Context, dataRecords interface{}, userId string, typ string, startTime time.Time, endTime time.Time) error
GetLastUpdatedForUser(ctx context.Context, id string, typ string) (*types.UserLastUpdated, error)
DistinctUserIDs(ctx context.Context, typ string) ([]string, error)

CheckDataSetContainsType(ctx context.Context, dataSetID string, typ string) (bool, error)
}

// DataRepository is the combined interface of DataSetRepository and
Expand Down
4 changes: 4 additions & 0 deletions data/types/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ func (b *Base) GetType() string {
return b.Type
}

func (b *Base) GetActive() bool {
return b.Active
}

func (b *Base) SetType(typ string) {
b.Type = typ
}
Expand Down