Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACK-2702] Add reason for setting last updated and outdated since #673

Merged
merged 28 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
7c4a31b
Add reason for setting last updated and outdated since
toddkazakov Oct 19, 2023
c42af18
add more specific conditions for SetOutdated triggers
Roukoswarf Oct 20, 2023
376b05b
fix some unit tests
Roukoswarf Oct 20, 2023
c267c7f
update to ginkgo v2 and add some unit tests
Roukoswarf Oct 24, 2023
c535a59
fix build and unit tests, still incomplete testing
Roukoswarf Oct 24, 2023
dcbe1fc
fix formatting
Roukoswarf Oct 24, 2023
99d13a5
possibly fix make test
Roukoswarf Oct 24, 2023
3b67506
more test fixes
Roukoswarf Oct 24, 2023
c554a3b
travis use go 1.21 for real
Roukoswarf Oct 24, 2023
99611b2
update vendor
Roukoswarf Oct 24, 2023
8fa2e8d
more unit tests
Roukoswarf Oct 25, 2023
15dc174
more unit tests
Roukoswarf Oct 25, 2023
f407953
formatting
Roukoswarf Oct 25, 2023
4c70f63
add outdatedSinceLimit and minor bugfixes
Roukoswarf Oct 26, 2023
b5e7ec5
zero out OutdatedSinceLimit on update
Roukoswarf Oct 26, 2023
020518c
formatting
Roukoswarf Oct 26, 2023
325b5f8
update deps
Roukoswarf Oct 26, 2023
68d2f2e
cleanup for review
Roukoswarf Oct 30, 2023
be86c05
update deps
Roukoswarf Oct 30, 2023
ebded89
remove direct depend on shopify/sarama
Roukoswarf Oct 30, 2023
620a678
remove dep on github.com/pkg/errors
Roukoswarf Oct 30, 2023
d0a8aaa
remove ginkgo v1
Roukoswarf Oct 30, 2023
ce1820d
address review comments
Roukoswarf Oct 30, 2023
b7cd5cf
update unit test to handle mongodb time rounding
Roukoswarf Nov 1, 2023
bcf85e7
move DataSetsDataCreate summary logic after deduplicator
Roukoswarf Nov 1, 2023
b7b61b7
add schemamigration reason
Roukoswarf Nov 2, 2023
599f396
dont clear LastUpdatedReason on SetOutdated
Roukoswarf Nov 3, 2023
adb3cc5
Merge remote-tracking branch 'origin/master' into ehr-triggers-spike
Roukoswarf Nov 3, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions data/summary/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"time"

"github.com/pkg/errors"
"errors"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
Expand Down Expand Up @@ -54,7 +54,7 @@ func (r *Repo[T, A]) GetSummary(ctx context.Context, userId string) (*types.Summ
if errors.Is(err, mongo.ErrNoDocuments) {
return nil, nil
} else if err != nil {
return nil, errors.Wrap(err, "unable to get summary")
return nil, fmt.Errorf("unable to get summary: %w", err)
}

return summary, nil
Expand All @@ -74,7 +74,7 @@ func (r *TypelessRepo) DeleteSummary(ctx context.Context, userId string) error {

_, err := r.DeleteMany(ctx, selector)
if err != nil {
return errors.Wrap(err, "unable to delete summary")
return fmt.Errorf("unable to delete summary: %w", err)
}

return nil
Expand All @@ -95,7 +95,7 @@ func (r *Repo[T, A]) DeleteSummary(ctx context.Context, userId string) error {

_, err := r.DeleteMany(ctx, selector)
if err != nil {
return errors.Wrap(err, "unable to delete summary")
return fmt.Errorf("unable to delete summary: %w", err)
}

return nil
Expand Down Expand Up @@ -164,7 +164,7 @@ func (r *Repo[T, A]) CreateSummaries(ctx context.Context, summaries []*types.Sum
for i, userSummary := range summaries {
// we don't guard against duplicates, as they fail to insert safely, we only worry about unfilled fields
if userSummary.UserID == "" {
return 0, errors.Errorf("userId is missing at index %d", i)
return 0, fmt.Errorf("userId is missing at index %d", i)
} else if userSummary.Type != expectedType {
return 0, fmt.Errorf("invalid summary type '%v', expected '%v' at index %d", userSummary.Type, expectedType, i)
}
Expand All @@ -179,9 +179,9 @@ func (r *Repo[T, A]) CreateSummaries(ctx context.Context, summaries []*types.Sum

if err != nil {
if count > 0 {
return count, errors.Wrap(err, "failed to create some summaries")
return count, fmt.Errorf("failed to create some summaries: %w", err)
}
return count, errors.Wrap(err, "unable to create summaries")
return count, fmt.Errorf("unable to create summaries: %w", err)
}
return count, nil
}
Expand All @@ -208,7 +208,7 @@ func (r *Repo[T, A]) SetOutdated(ctx context.Context, userId, reason string) (*t
userSummary.SetOutdated(reason)
err = r.UpsertSummary(ctx, userSummary)
if err != nil {
return nil, errors.Wrapf(err, "unable to update user %s outdatedSince date for type %s", userId, userSummary.Type)
return nil, fmt.Errorf("unable to update user %s outdatedSince date for type %s: %w", userId, userSummary.Type, err)
}

return userSummary.Dates.OutdatedSince, nil
Expand Down Expand Up @@ -238,12 +238,12 @@ func (r *Repo[T, A]) GetOutdatedUserIDs(ctx context.Context, page *page.Paginati
if errors.Is(err, mongo.ErrNoDocuments) {
return nil, nil
} else if err != nil {
return nil, errors.Wrap(err, "unable to get outdated summaries")
return nil, fmt.Errorf("unable to get outdated summariesL %w", err)
Roukoswarf marked this conversation as resolved.
Show resolved Hide resolved
}

var summaries []*types.Summary[T, A]
if err = cursor.All(ctx, &summaries); err != nil {
return nil, errors.Wrap(err, "unable to decode outdated summaries")
return nil, fmt.Errorf("unable to decode outdated summaries: %w", err)
}

var userIDs = make([]string, len(summaries))
Expand Down Expand Up @@ -278,12 +278,12 @@ func (r *Repo[T, A]) GetMigratableUserIDs(ctx context.Context, page *page.Pagina
if errors.Is(err, mongo.ErrNoDocuments) {
return nil, nil
} else if err != nil {
return nil, errors.Wrap(err, "unable to get outdated summaries")
return nil, fmt.Errorf("unable to get outdated summaries: %w", err)
}

var summaries []*types.Summary[T, A]
if err = cursor.All(ctx, &summaries); err != nil {
return nil, errors.Wrap(err, "unable to decode outdated summaries")
return nil, fmt.Errorf("unable to decode outdated summaries: %w", err)
}

var userIDs = make([]string, len(summaries))
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ require (
github.com/onsi/ginkgo v1.16.5
github.com/onsi/ginkgo/v2 v2.13.0
github.com/onsi/gomega v1.29.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.17.0
github.com/rinchsan/device-check-go v1.3.0
github.com/tidepool-org/clinic/client v0.0.0-20231026151906-ad2e71e79f6f
Expand Down Expand Up @@ -121,6 +120,7 @@ require (
github.com/nxadm/tail v1.4.11 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
Expand Down
30 changes: 15 additions & 15 deletions task/store/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"time"

"github.com/pkg/errors"
"errors"

"github.com/tidepool-org/platform/ehr/reconcile"

Expand Down Expand Up @@ -185,7 +185,7 @@ func (t *TaskRepository) ensureTask(ctx context.Context, create *task.TaskCreate
if err != nil {
return err
} else if err = structureValidator.New().Validate(tsk); err != nil {
return errors.Wrap(err, "task is invalid")
return fmt.Errorf("task is invalid: %w", err)
}
if err := t.assertType(t.typeFilter, &tsk.Type); err != nil {
return err
Expand All @@ -204,8 +204,8 @@ func (t *TaskRepository) ensureTask(ctx context.Context, create *task.TaskCreate
&opts,
)

if res.Err() != nil && res.Err() != mongo.ErrNoDocuments {
return errors.Wrap(res.Err(), "unable to create task")
if res.Err() != nil && !errors.Is(res.Err(), mongo.ErrNoDocuments) {
return fmt.Errorf("unable to create task: %w", res.Err())
}

TasksStateTotal.WithLabelValues(task.TaskStatePending, create.Type).Inc()
Expand All @@ -220,12 +220,12 @@ func (t *TaskRepository) ListTasks(ctx context.Context, filter *task.TaskFilter,
if filter == nil {
filter = task.NewTaskFilter()
} else if err := structureValidator.New().Validate(filter); err != nil {
return nil, errors.Wrap(err, "filter is invalid")
return nil, fmt.Errorf("filter is invalid: %w", err)
}
if pagination == nil {
pagination = page.NewPagination()
} else if err := structureValidator.New().Validate(pagination); err != nil {
return nil, errors.Wrap(err, "pagination is invalid")
return nil, fmt.Errorf("pagination is invalid: %w", err)
}
if err := t.assertType(t.typeFilter, filter.Type); err != nil {
return nil, err
Expand All @@ -251,11 +251,11 @@ func (t *TaskRepository) ListTasks(ctx context.Context, filter *task.TaskFilter,
cursor, err := t.Find(ctx, selector, opts)
logger.WithFields(log.Fields{"count": len(tasks), "duration": time.Since(now) / time.Microsecond}).WithError(err).Debug("ListTasks")
if err != nil {
return nil, errors.Wrap(err, "unable to list tasks")
return nil, fmt.Errorf("unable to list tasks: %w", err)
}

if err = cursor.All(ctx, &tasks); err != nil {
return nil, errors.Wrap(err, "unable to decode tasks")
return nil, fmt.Errorf("unable to decode tasks: %w", err)
}

if tasks == nil {
Expand All @@ -274,7 +274,7 @@ func (t *TaskRepository) CreateTask(ctx context.Context, create *task.TaskCreate
if err != nil {
return nil, err
} else if err = structureValidator.New().Validate(tsk); err != nil {
return nil, errors.Wrap(err, "task is invalid")
return nil, fmt.Errorf("task is invalid: %w", err)
}
if err := t.assertType(t.typeFilter, &tsk.Type); err != nil {
return nil, err
Expand All @@ -286,7 +286,7 @@ func (t *TaskRepository) CreateTask(ctx context.Context, create *task.TaskCreate
_, err = t.InsertOne(ctx, tsk)
logger.WithFields(log.Fields{"id": tsk.ID, "duration": time.Since(now) / time.Microsecond}).WithError(err).Debug("CreateTask")
if err != nil {
return nil, errors.Wrap(err, "unable to create task")
return nil, fmt.Errorf("unable to create task: %w", err)
}

TasksStateTotal.WithLabelValues(task.TaskStatePending, create.Type).Inc()
Expand Down Expand Up @@ -317,7 +317,7 @@ func (t *TaskRepository) GetTask(ctx context.Context, id string) (*task.Task, er
if errors.Is(err, mongo.ErrNoDocuments) {
return nil, nil
} else if err != nil {
return nil, errors.Wrap(err, "unable to get task")
return nil, fmt.Errorf("unable to get task: %w", err)
}

return task, nil
Expand All @@ -333,7 +333,7 @@ func (t *TaskRepository) UpdateTask(ctx context.Context, id string, update *task
if update == nil {
return nil, errors.New("update is missing")
} else if err := structureValidator.New().Validate(update); err != nil {
return nil, errors.Wrap(err, "update is invalid")
return nil, fmt.Errorf("update is invalid: %w", err)
}

now := time.Now()
Expand Down Expand Up @@ -363,7 +363,7 @@ func (t *TaskRepository) UpdateTask(ctx context.Context, id string, update *task
changeInfo, err := t.UpdateMany(ctx, selector, t.ConstructUpdate(set, bson.M{}))
logger.WithFields(log.Fields{"changeInfo": changeInfo, "duration": time.Since(now) / time.Microsecond}).WithError(err).Debug("UpdateTask")
if err != nil {
return nil, errors.Wrap(err, "unable to update task")
return nil, fmt.Errorf("unable to update task: %w", err)
}

return t.GetTask(ctx, id)
Expand All @@ -388,7 +388,7 @@ func (t *TaskRepository) DeleteTask(ctx context.Context, id string) error {
changeInfo, err := t.DeleteOne(ctx, selector)
logger.WithFields(log.Fields{"changeInfo": changeInfo, "duration": time.Since(now) / time.Microsecond}).WithError(err).Debug("DeleteTask")
if err != nil {
return errors.Wrap(err, "unable to delete task")
return fmt.Errorf("unable to delete task: %w", err)
}

return nil
Expand Down Expand Up @@ -419,7 +419,7 @@ func (t *TaskRepository) UpdateFromState(ctx context.Context, tsk *task.Task, st
result, err := t.ReplaceOne(ctx, selector, tsk)
logger.WithField("duration", time.Since(now)/time.Microsecond).WithError(err).Debug("UpdateFromState")
if err != nil {
return nil, errors.Wrap(err, "unable to update from state")
return nil, fmt.Errorf("unable to update from state: %w", err)
}
if result.ModifiedCount != 1 {
return nil, task.AlreadyClaimedTask
Expand Down
2 changes: 1 addition & 1 deletion task/store/mongo/mongo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/prometheus/client_golang/prometheus/testutil"
"go.mongodb.org/mongo-driver/bson"

"github.com/pkg/errors"
"errors"

"github.com/tidepool-org/platform/log"
logTest "github.com/tidepool-org/platform/log/test"
Expand Down
6 changes: 3 additions & 3 deletions task/summary/backfillrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

"go.mongodb.org/mongo-driver/bson"

"github.com/pkg/errors"
"errors"

"github.com/tidepool-org/platform/auth"
dataClient "github.com/tidepool-org/platform/data/client"
Expand Down Expand Up @@ -116,12 +116,12 @@ func (r *BackfillRunner) Run(ctx context.Context, tsk *task.Task) bool {
config := r.GetConfig(tsk)

if serverSessionToken, sErr := r.authClient.ServerSessionToken(); sErr != nil {
tsk.AppendError(errors.Wrap(sErr, "unable to get server session token"))
tsk.AppendError(fmt.Errorf("unable to get server session token: %w", sErr))
} else {
ctx = auth.NewContextWithServerSessionToken(ctx, serverSessionToken)

if taskRunner, tErr := NewBackfillTaskRunner(r, tsk); tErr != nil {
tsk.AppendError(errors.Wrap(tErr, "unable to create task runner"))
tsk.AppendError(fmt.Errorf("unable to create task runner: %w", tErr))
} else if tErr = taskRunner.Run(ctx); tErr != nil {
tsk.AppendError(tErr)
}
Expand Down
10 changes: 6 additions & 4 deletions task/summary/migrationrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package summary

import (
"context"
"fmt"
"math/rand"
"time"

"github.com/pkg/errors"
"errors"

"go.mongodb.org/mongo-driver/bson"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -119,14 +121,14 @@ func (r *MigrationRunner) Run(ctx context.Context, tsk *task.Task) bool {
config := r.GetConfig(tsk)

if serverSessionToken, sErr := r.authClient.ServerSessionToken(); sErr != nil {
tsk.AppendError(errors.Wrap(sErr, "unable to get server session token"))
tsk.AppendError(fmt.Errorf("unable to get server session token: %w", sErr))
} else {
ctx = auth.NewContextWithServerSessionToken(ctx, serverSessionToken)

if taskRunner, tErr := NewMigrationTaskRunner(r, tsk); tErr != nil {
tsk.AppendError(errors.Wrap(tErr, "unable to create task runner"))
tsk.AppendError(fmt.Errorf("unable to create task runner: %w", tErr))
} else if tErr = taskRunner.Run(ctx, *config.Batch); tErr != nil {
tsk.AppendError(errors.Wrap(tErr, "unable to run task runner"))
tsk.AppendError(fmt.Errorf("unable to run task runner: %w", tErr))
}
}

Expand Down
2 changes: 1 addition & 1 deletion task/summary/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package summary
import (
"time"

"github.com/pkg/errors"
"errors"

"github.com/tidepool-org/platform/pointer"
"github.com/tidepool-org/platform/task"
Expand Down
9 changes: 5 additions & 4 deletions task/summary/updaterunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package summary

import (
"context"
"fmt"
"math/rand"
"time"

"go.mongodb.org/mongo-driver/bson"

"github.com/tidepool-org/platform/page"

"github.com/pkg/errors"
"errors"

"github.com/tidepool-org/platform/auth"
dataClient "github.com/tidepool-org/platform/data/client"
Expand Down Expand Up @@ -122,14 +123,14 @@ func (r *UpdateRunner) Run(ctx context.Context, tsk *task.Task) bool {
config := r.GetConfig(tsk)

if serverSessionToken, sErr := r.authClient.ServerSessionToken(); sErr != nil {
tsk.AppendError(errors.Wrap(sErr, "unable to get server session token"))
tsk.AppendError(fmt.Errorf("unable to get server session token: %w", sErr))
} else {
ctx = auth.NewContextWithServerSessionToken(ctx, serverSessionToken)

if taskRunner, tErr := NewUpdateTaskRunner(r, tsk); tErr != nil {
tsk.AppendError(errors.Wrap(tErr, "unable to create task runner"))
tsk.AppendError(fmt.Errorf("unable to create task runner: %w", tErr))
} else if tErr = taskRunner.Run(ctx, *config.Batch); tErr != nil {
tsk.AppendError(errors.Wrap(tErr, "unable to run task runner"))
tsk.AppendError(fmt.Errorf("unable to run task runner: %w", tErr))
}
}

Expand Down