-
-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
care partner alerts #715
base: master
Are you sure you want to change the base?
care partner alerts #715
Conversation
374bae2
to
2deda23
Compare
8549c33
to
8367902
Compare
8367902
to
2ea9686
Compare
2ea9686
to
7246848
Compare
To use this in QA, it must be paired with tidepool-org/hydrophone#145 and tidepool-org/go-common#71 |
c50d589
to
986106b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall, but the retry mechanism which is implemented here doesn't satisfy the latency requirements. The current implementation is ok for internal usage, but it's not production ready. This could be handled in a separate PR if this makes the development and QA process easier.
data/events/events.go
Outdated
} | ||
handler := asyncevents.NewSaramaConsumerGroupHandler(&asyncevents.NTimesRetryingConsumer{ | ||
Consumer: r.Config.MessageConsumer, | ||
Delay: CappedExponentialBinaryDelay(AlertsEventRetryDelayMaximum), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is a suitable retry strategy given the latency requirements for this service. Kafka's consumer group concurrency is limited to the number of partitions of the topic. This number cannot be very high because Kafka's memory consumption grows linearly with the number of partitions. From this follows that the number of partitions is much lower than the number of users we will have and the data of multiple users will end up in the same partition. A failure to evaluate a single user's alerts for one minute as currently set by the CappedExponentialBinaryDelay
will introduce at least a minute delay to all of the users sharing the same partition, because messages in a single partition are processed serially.
Alert notifications should be near real-time - up to 10 seconds latency is acceptable. I think the solution proposed in this design document is how this should be handled. Other solutions which satisfy the requirements are welcome.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will require some more in-depth thought on my part... Will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think you're right, let's get this review merged, and I'll work on getting a multiple topic solution set up. Given the flexibility we have now, it shouldn't be too bad.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the multi-tier retry in the eric-alerts-multi-topic-retry branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be implemented in this branch now.
c08e1fc
to
967c617
Compare
967c617
to
cd42b19
Compare
9432468
to
eaa652e
Compare
d6449a1
to
ee5da4a
Compare
@toddkazakov I just removed two config env vars. I believe we talked about that before, but it slipped my mind until I was reviewing the helm chart changes today, where they came up again. So the re-review here is just around the config parsing, in the most recent commit of the PR, nothing else is changed. [UPDATE] this comment is outdated. |
8ae1dc8
to
28fdf06
Compare
28fdf06
to
b9767dc
Compare
These won't be changing at runtime, so there's no need to complicate the initialization by making these configurable. The topic's prefix is configurable, and that's the part that will change from environment to environment at runtime. BACK-2554
A rebase has picked up work performed by Darin, which removes the need for this token injection. \o/ Yay!
These tests, and the functionality they cover were moved into the alerts/client.go in a previous commit.
BACK-2449
- UsersWithoutCommunication endpoint added to data service - UsersWithoutCommunication endpoint added to alerts client - implementing no communication alerts via the task service - evaluation of alerts conditions re-worked - The new system recognizes that some alerts are generated by events (so-called "Data Alerts") while others are polled (no communication). - The new evaluator lives in the alerts package (was data/events) - implemented tracking of sent notifications - Recording repo is implemented to record/index the time of the last received data from a user BACK-2558
I've identified that there are some big changes that will need to happen in order to manage marking things sent and resolved. Those will come in a future commit. BACK-2559
The above turned out to be a lot more complicated than I had imagined they'd be. BACK-2559
2e91daf
to
9562b32
Compare
Ready for review once again. |
As requested in code review. tidepool-org/terraform-modules#72 (review) BACK-2559
BACK-2559 BACK-2499
099bf63
to
f49218a
Compare
data/store/mongo/mongo.go
Outdated
@@ -66,3 +71,13 @@ func (s *Store) NewAlertsRepository() alerts.Repository { | |||
r := alertsRepo(*s.Store.GetRepository("alerts")) | |||
return &r | |||
} | |||
|
|||
func (s *Store) NewRecorderRepository() alerts.RecordsRepository { | |||
r := recorderRepo(*s.Store.GetRepository("records")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not obvious what is meant by record
. Is there a better name that we can use?
alerts/tasks.go
Outdated
now := time.Now() | ||
if nextDesiredRun.Before(now) { | ||
r.logger.Info("care partner is bumping nextDesiredRun") | ||
// nextDesiredRun, when added to time.Now in tsk.RepeatAvailableAfter, must |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest changing the code of the queue to not fail the task if available time is in the past. It looks like a safeguard to prevent starvation in case we run tasks in an endless loop in case available time is set incorrectly, but now we have a good reason to remove this code. This way we can remove the workaround you came up with which looks fragile and will terminate the task in case the task goes into failed state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@darinkrauss any thoughts or advice on this before I spend too much time working on it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ewollesen I think that the change you'd need to make is, in task/queue/queue.go
:
func (q *queue) computeState(tsk *task.Task) {
switch tsk.State {
case task.TaskStatePending:
if tsk.AvailableTime == nil || time.Now().After(*tsk.AvailableTime) {
tsk.AppendError(errors.New("pending task requires future available time"))
tsk.SetFailed()
}
to:
func (q *queue) computeState(tsk *task.Task) {
switch tsk.State {
case task.TaskStatePending:
if tsk.AvailableTime == nil {
tsk.AvailableTime = time.Now()
} else if time.Now().After(*tsk.AvailableTime) {
tsk.AppendError(errors.New("pending task requires future available time"))
tsk.SetFailed()
}
I think it is okay, but there would need to be a deeper look into Dexcom task to see if it would be an issue. I don't think so, but 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@darinkrauss @toddkazakov : I've made the change, and added a method to make use of it.
I don't see any problems with the dexcom task, but you'd be the expert there for sure Darin. As far as I could see, the dexcom task always either have an available time, or are in an failed state (and so available time doesn't come into play).
task/carepartner.go
Outdated
|
||
const CarePartnerType = "org.tidepool.carepartner" | ||
|
||
func NewCarePartnerTaskCreate() *TaskCreate { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should move this to the alerts
package.
task/carepartner_test.go
Outdated
. "github.com/onsi/gomega" | ||
) | ||
|
||
var _ = Describe("NewCarePartnerTaskCreate", func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should move this to the alerts
package.
data/service/service/standard.go
Outdated
}, | ||
} | ||
|
||
retryDelays := []time.Duration{0, 1 * time.Second} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retry delays should be configurable with an env variable in case we want to adjust them, instead of having to modify the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
See also tidepool-org/development#314
data/store/mongo/mongo_recorder.go
Outdated
} | ||
} | ||
|
||
func (d *recorderRepo) UsersWithoutCommunication(ctx context.Context) ([]alerts.LastCommunication, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't return "users", consider renaming this.
data/store/mongo/mongo_recorder.go
Outdated
structuredmongo "github.com/tidepool-org/platform/store/structured/mongo" | ||
) | ||
|
||
// recorderRepo implements RecorderRepository, writing data to a MongoDB collection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename this to something more descriptive
alerts/config.go
Outdated
ctx = log.NewContextWithLogger(ctx, lgr) | ||
nc := c.Alerts.NoCommunication.Evaluate(ctx, last) | ||
needsUpsert := c.Activity.NoCommunication.Update(nc.OutOfRange) | ||
// TODO check re-eval? I don't think so |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Open a ticket if this needs to be addressed and remove the TODO comment
alerts/config.go
Outdated
"time" | ||
|
||
"github.com/tidepool-org/platform/data" | ||
"github.com/tidepool-org/platform/data/blood/glucose" | ||
nontypesglucose "github.com/tidepool-org/platform/data/blood/glucose" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename this (and other occurrences) to dataBloodGlucose
for consistency. I don't think nontypesglucose
has any benefit over the existing alias used throughout the repository.
alerts/config.go
Outdated
} | ||
|
||
// RecordsRepository encapsulates queries of the records collection for use with alerts. | ||
type RecordsRepository interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's important to capture what type of "communications" this repository stores and that those are used solely for alerting purposes. This applies to the struct, interface and collection. Unfortunately, I can't come up with a good suggestion myself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've taken a stab at a better name. Let me know what you think.
This maintains consistency. BACK-2499 BACK-2559
Renames: Recorder -> LastCommunicationsRecorder UsersWithoutCommunication -> OverdueCommunications BACK-2499 BACK-2559
Explicit > Implicit BACK-2499 BACK-2559
BACK-2499 BACK-2559
BACK-2499 BACK-2559
Requested in code review. #715 (comment) BACK-2499 BACK-2559
BACK-2499 BACK-2559
Previously, when completing a task, an available time of nil would cause the task to be marked as failed. Now, when a task completes and has available time of nil, time.Now() is substituted, which should cause the task to be run again ASAP. This supports the care partner no communication check, which wants to run 1x/second, but as that's not available with the task service (the smallest interval is 5 seconds), setting the value to nil will run the task on each task service iteration. BACK-2559
b5dd269
to
0da82d6
Compare
if tsk.AvailableTime == nil { | ||
tsk.AvailableTime = pointer.FromAny(time.Now()) | ||
} else if time.Now().After(*tsk.AvailableTime) { | ||
tsk.AppendError(errors.New("pending task requires future available time")) | ||
tsk.SetFailed() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, after thinking a bit more on this, let's change this to:
if tsk.AvailableTime == nil { | |
tsk.AvailableTime = pointer.FromAny(time.Now()) | |
} else if time.Now().After(*tsk.AvailableTime) { | |
tsk.AppendError(errors.New("pending task requires future available time")) | |
tsk.SetFailed() | |
} | |
if now := time.Now(); tsk.AvailableTime == nil || tsk.AvailableTime.Before(now) { | |
tsk.AvailableTime = pointer.FromAny(now) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should fully resolve any edge cases (due to timing, etc.) plus allow for nil
to just mean "ASAP".
@@ -0,0 +1,233 @@ | |||
package events |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this part of the formal (public) Tidepool data model or is it just an internal-only (never to be exposed to the user)? If public, then I'd like to review and comment, but it'll need to wait a bit until I have more time.
|
||
platformConfig := platform.NewConfig() | ||
platformConfig.UserAgent = s.UserAgent() | ||
reporter := s.ConfigReporter().WithScopes("data", "client") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this using "data" client, shouldn't this be "alert(s)" client?
func (s *Service) initializePusher() error { | ||
var err error | ||
|
||
apns2Config := &struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically we define the config struct alongside the object is is configuring (APNSPusher, in this case). It allows greatly isolation of knowledge. Also, my personal preference is that we keep using ConfigReporter rather than adding in yet another configuration mechanism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like us to move away from config reporter. With many seemingly similar components which use generic names (services, clients, service clients, standard clients, etc) it's absolutely impossible to determine where an env variable is used and how the configuration is loaded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, that makes complete sense to me. I agree that ConfigReporter isn't completely obvious and moving to a more direct mechanism for configuration is a good idea.
I think, however, we should have a specific, agreed-upon plan for doing so. I'd rather not have multiple alternatives to ConfigReporter in the code, but one agreed upon solution going forward. Plus, I think we may be able to update the various configs with specific struct fields and apply a surgical change to ConfigReporter that would prevent a large change to remove ConfigReporter (at least in the short-term).
This used to be a series of PRs, but that didn't really work out. They're all collapsed into this one.
Shouldn't be merged until tidepool-org/go-common#71 is merged, then this should have it's go-common bumped.