Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

care partner alerts #715

Open
wants to merge 39 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
91abe8f
adds List and Get methods to alerts client
ewollesen May 6, 2024
2dbc709
lift Repeat out of the base alert config
ewollesen May 6, 2024
c4fd175
adds activity tracking to alert configurations
ewollesen May 6, 2024
164abff
adds auth endpoint to retrieve a user's device tokens
ewollesen May 7, 2024
2eb7670
adds the ability to retrieve device tokens to the auth client
ewollesen May 7, 2024
451d063
remove unused device tokens repo from data
ewollesen May 7, 2024
cb7145d
adds a pusher client for sending APNs push notifications
ewollesen May 7, 2024
58e95d1
adapts sarama.Logger to implement log.Logger
ewollesen Jun 26, 2024
53ade17
adapts go-common's asyncevents.SaramaEventsConsumer for alerts
ewollesen May 8, 2024
d1fe15b
allow invites to set an upload id
ewollesen Jul 2, 2024
a8d6148
integrates an APNs pusher into data service
ewollesen Jul 8, 2024
f44db4f
adds Evaluate methods to alerts.Config
ewollesen Jul 8, 2024
df32675
adds the alerts events consumer to the data service
ewollesen Jul 8, 2024
6313318
remove some debugging logs
ewollesen Jul 11, 2024
8955e56
small fixes from code review
ewollesen Jul 12, 2024
4e0981f
rename Note => Notification
ewollesen Jul 12, 2024
af60d8c
one mock of DeviceTokenRepository is enough
ewollesen Jul 19, 2024
3d8f19f
add a topic cascading retry mechanism for care partner alerts
ewollesen Jul 24, 2024
ebb179c
modifies DelayingConsumer to use a message header instead of a delay
ewollesen Sep 16, 2024
329ca63
just a little more explanation of cascading consumer
ewollesen Sep 20, 2024
33b5020
don't read topic and consumer group id from runtime configuration
ewollesen Oct 7, 2024
3846520
there's no longer a need to inject server session tokens
ewollesen Dec 10, 2024
38d1c85
removes out-of-date tests
ewollesen Dec 10, 2024
19bdcda
improve test coverage
ewollesen Dec 11, 2024
a2be12d
add data set id to alerts Evaluation, improve test coverage
ewollesen Dec 11, 2024
02e6a72
implement no communication alerts
ewollesen Oct 24, 2024
7a81670
evaluate not looping conditions part 1
ewollesen Dec 12, 2024
9562b32
re-working to handle alert resolution and sent tracking
ewollesen Jan 21, 2025
239f89c
reduce kafka topics for care partner alerts outside of production
ewollesen Feb 4, 2025
f49218a
bump go-common to get kafka CDC updates for CPA
ewollesen Feb 4, 2025
bfe48fb
rename nontypesglucose -> dataBloodGlucose
ewollesen Feb 10, 2025
e6b2cea
renames Recorder & UsersWithoutCommunications
ewollesen Feb 10, 2025
7c3f41d
pass a log.Logger to EvaluateNoCommunication
ewollesen Feb 10, 2025
55b9150
remove un-needed comment
ewollesen Feb 10, 2025
2aa72c5
move care partner task definition to alerts package
ewollesen Feb 10, 2025
1140500
make GetRunnerDeadline() use a multiple of GetRunnerDurationMaximum()
ewollesen Feb 10, 2025
0586c46
replace magic number with named constant
ewollesen Feb 11, 2025
8f3d3ab
adds environment-based config for alerts retry delays
ewollesen Feb 12, 2025
0da82d6
modify task service to allow tasks to repeat ASAP
ewollesen Feb 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ ci-test-watch: ginkgo
go-test:
. ./env.test.sh && $(TIMING_CMD) go test $(GOTEST_FLAGS) $(GOTEST_PKGS)

go-ci-test: GOTEST_FLAGS += -count=1 -race -shuffle=on -cover
go-ci-test: override GOTEST_FLAGS += -count=1 -race -shuffle=on -cover
go-ci-test: GOTEST_PKGS = ./...
go-ci-test: go-test

Expand Down
80 changes: 57 additions & 23 deletions alerts/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package alerts
import (
"context"
"net/http"
"time"

"github.com/kelseyhightower/envconfig"

"github.com/tidepool-org/platform/auth"
"github.com/tidepool-org/platform/client"
"github.com/tidepool-org/platform/errors"
platformlog "github.com/tidepool-org/platform/log"
"github.com/tidepool-org/platform/log/null"
"github.com/tidepool-org/platform/platform"
Expand All @@ -16,22 +17,20 @@ import (

// Client for managing alerts configs.
type Client struct {
client PlatformClient
logger platformlog.Logger
tokenProvider auth.ServerSessionTokenProvider
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Darin upstreamed some changes that remove the necessity of a separate token provider.

client PlatformClient
logger platformlog.Logger
}

// NewClient builds a client for interacting with alerts API endpoints.
//
// If no logger is provided, a null logger is used.
func NewClient(client PlatformClient, tokenProvider auth.ServerSessionTokenProvider, logger platformlog.Logger) *Client {
func NewClient(client PlatformClient, logger platformlog.Logger) *Client {
if logger == nil {
logger = null.NewLogger()
}
return &Client{
client: client,
logger: logger,
tokenProvider: tokenProvider,
client: client,
logger: logger,
}
}

Expand All @@ -44,34 +43,69 @@ type PlatformClient interface {

// request performs common operations before passing a request off to the
// underlying platform.Client.
func (c *Client) request(ctx context.Context, method, url string, body any) error {
func (c *Client) request(ctx context.Context, method, url string, reqBody, resBody any) error {
// Platform's client.Client expects a logger to exist in the request's
// context. If it doesn't exist, request processing will panic.
loggingCtx := platformlog.NewContextWithLogger(ctx, c.logger)
// Make sure the auth token is injected into the request's headers.
return c.requestWithAuth(loggingCtx, method, url, body)
}

// requestWithAuth injects an auth token before calling platform.Client.RequestData.
//
// At time of writing, this is the only way to inject credentials into
// platform.Client. It might be nice to be able to use a mutator, but the auth
// is specifically handled by the platform.Client via the context field, and
// if left blank, platform.Client errors.
func (c *Client) requestWithAuth(ctx context.Context, method, url string, body any) error {
return c.client.RequestData(auth.NewContextWithServerSessionTokenProvider(ctx, c.tokenProvider), method, url, nil, body, nil)
return c.client.RequestData(loggingCtx, method, url, nil, reqBody, resBody)
}

// Upsert updates cfg if it exists or creates it if it doesn't.
func (c *Client) Upsert(ctx context.Context, cfg *Config) error {
url := c.client.ConstructURL("v1", "users", cfg.FollowedUserID, "followers", cfg.UserID, "alerts")
return c.request(ctx, http.MethodPost, url, cfg)
return c.request(ctx, http.MethodPost, url, cfg, nil)
}

// Delete the alerts config.
func (c *Client) Delete(ctx context.Context, cfg *Config) error {
url := c.client.ConstructURL("v1", "users", cfg.FollowedUserID, "followers", cfg.UserID, "alerts")
return c.request(ctx, http.MethodDelete, url, nil)
return c.request(ctx, http.MethodDelete, url, nil, nil)
}

// Get a user's alerts configuration for the followed user.
func (c *Client) Get(ctx context.Context, followedUserID, userID string) (*Config, error) {
url := c.client.ConstructURL("v1", "users", followedUserID, "followers", userID, "alerts")
config := &Config{}
err := c.request(ctx, http.MethodGet, url, nil, config)
if err != nil {
return nil, errors.Wrap(err, "Unable to request alerts config")
}
return config, nil
}

// List the alerts configurations that follow the given user.
//
// This method should only be called via an authenticated service session.
func (c *Client) List(ctx context.Context, followedUserID string) ([]*Config, error) {
url := c.client.ConstructURL("v1", "users", followedUserID, "followers", "alerts")
configs := []*Config{}
err := c.request(ctx, http.MethodGet, url, nil, &configs)
if err != nil {
c.logger.Debugf("unable to request alerts configs list: %+v %T", err, err)
return nil, errors.Wrap(err, "Unable to request alerts configs list")
}
return configs, nil
}

// UsersWithoutCommunication are those that haven't communicated in some time.
//
// This method should only be called via an authenticated service session.
func (c *Client) UsersWithoutCommunication(ctx context.Context) ([]LastCommunication, error) {
url := c.client.ConstructURL("v1", "users", "without_communication")
lastComms := []LastCommunication{}
err := c.request(ctx, http.MethodGet, url, nil, &lastComms)
if err != nil {
c.logger.Debugf("getting users without communication: \"%+v\" %T", err, err)
return nil, errors.Wrap(err, "Unable to list users without communication")
}
return lastComms, nil
}

// LastCommunication records the last time data was received from a user.
type LastCommunication struct {
UserID string `bson:"userId" json:"userId"`
DataSetID string `bson:"dataSetId" json:"dataSetId"`
LastReceivedDeviceData time.Time `bson:"lastReceivedDeviceData" json:"lastReceivedDeviceData"`
}

// ConfigLoader abstracts the method by which config values are loaded.
Expand Down
123 changes: 78 additions & 45 deletions alerts/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/tidepool-org/platform/auth"
"github.com/tidepool-org/platform/client"
"github.com/tidepool-org/platform/log"
"github.com/tidepool-org/platform/log/null"
"github.com/tidepool-org/platform/platform"
)

const testToken = "auth-me"
const testUserID = "test-user-id"
const testFollowedUserID = "test-followed-user-id"
const testDataSetID = "upid_000000000000"

var _ = Describe("Client", func() {
var test404Server, test200Server *httptest.Server
var testAuthServer func(*string) *httptest.Server
var test404Server *httptest.Server
var test200Server func(string) *httptest.Server

BeforeEach(func() {
t := GinkgoT()
Expand All @@ -28,87 +30,118 @@ var _ = Describe("Client", func() {
test404Server = testServer(t, func(w http.ResponseWriter, r *http.Request) {
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
})
test200Server = testServer(t, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
testAuthServer = func(token *string) *httptest.Server {
test200Server = func(resp string) *httptest.Server {
return testServer(t, func(w http.ResponseWriter, r *http.Request) {
*token = r.Header.Get(auth.TidepoolSessionTokenHeaderKey)
w.WriteHeader(http.StatusOK)
w.Write([]byte(resp))
})
}
})

Context("Delete", func() {
It("returns an error on non-200 responses", func() {
ItReturnsAnErrorOnNon200Responses := func(f func(context.Context, *Client) error) {
GinkgoHelper()
It("returns an error on non-200 respnoses", func() {
client, ctx := newAlertsClientTest(test404Server)
err := client.Delete(ctx, &Config{})
err := f(ctx, client)
Expect(err).Should(HaveOccurred())
Expect(err).To(MatchError(ContainSubstring("resource not found")))
})
}

It("returns nil on success", func() {
client, ctx := newAlertsClientTest(test200Server)
err := client.Delete(ctx, &Config{})
Expect(err).ShouldNot(HaveOccurred())
ItReturnsANilErrorOnSuccess := func(resp string, f func(context.Context, *Client) error) {
GinkgoHelper()
It("returns a nil error on success", func() {
client, ctx := newAlertsClientTest(test200Server(resp))
err := f(ctx, client)
Expect(err).To(Succeed())
})
}

Context("Delete", func() {
ItReturnsAnErrorOnNon200Responses(func(ctx context.Context, client *Client) error {
return client.Delete(ctx, &Config{})
})

It("injects an auth token", func() {
token := ""
client, ctx := newAlertsClientTest(testAuthServer(&token))
_ = client.Delete(ctx, &Config{})
Expect(token).To(Equal(testToken))
ItReturnsANilErrorOnSuccess("", func(ctx context.Context, client *Client) error {
return client.Delete(ctx, &Config{})
})
})

Context("Upsert", func() {
It("returns an error on non-200 responses", func() {
client, ctx := newAlertsClientTest(test404Server)
err := client.Upsert(ctx, &Config{})
Expect(err).Should(HaveOccurred())
Expect(err).To(MatchError(ContainSubstring("resource not found")))
ItReturnsAnErrorOnNon200Responses(func(ctx context.Context, client *Client) error {
return client.Upsert(ctx, &Config{})
})

ItReturnsANilErrorOnSuccess("", func(ctx context.Context, client *Client) error {
return client.Upsert(ctx, &Config{})
})
})

Context("Get", func() {
ItReturnsAnErrorOnNon200Responses(func(ctx context.Context, client *Client) error {
_, err := client.Get(ctx, testFollowedUserID, testUserID)
return err
})

It("returns nil on success", func() {
client, ctx := newAlertsClientTest(test200Server)
err := client.Upsert(ctx, &Config{})
Expect(err).ShouldNot(HaveOccurred())
ret := `{
"userId": "14ee703f-ca9b-4a6b-9ce3-41d886514e7f",
"followedUserId": "ce5863bc-cc0b-4177-97d7-e8de0c558820",
"uploadId": "upid_00000000000000000000000000000000"
}`
ItReturnsANilErrorOnSuccess(ret, func(ctx context.Context, client *Client) error {
_, err := client.Get(ctx, testFollowedUserID, testUserID)
return err
})
})

It("injects an auth token", func() {
token := ""
client, ctx := newAlertsClientTest(testAuthServer(&token))
_ = client.Upsert(ctx, &Config{})
Expect(token).To(Equal(testToken))
Context("List", func() {
ItReturnsAnErrorOnNon200Responses(func(ctx context.Context, client *Client) error {
_, err := client.List(ctx, "")
return err
})

ItReturnsANilErrorOnSuccess("[]", func(ctx context.Context, client *Client) error {
_, err := client.List(ctx, "")
return err
})
})

Context("UsersWithoutCommunication", func() {
ItReturnsAnErrorOnNon200Responses(func(ctx context.Context, client *Client) error {
_, err := client.UsersWithoutCommunication(ctx)
return err
})

ItReturnsANilErrorOnSuccess("[]", func(ctx context.Context, client *Client) error {
_, err := client.UsersWithoutCommunication(ctx)
return err
})
})
})

func buildTestClient(s *httptest.Server) *Client {
pCfg := &platform.Config{
Config: &client.Config{
Address: s.URL,
},
Config: &client.Config{Address: s.URL},
ServiceSecret: "auth-me",
}
token := mockTokenProvider(testToken)
pc, err := platform.NewClient(pCfg, platform.AuthorizeAsService)
Expect(err).ToNot(HaveOccurred())
client := NewClient(pc, token, null.NewLogger())
client := NewClient(pc, null.NewLogger())
return client
}

func newAlertsClientTest(server *httptest.Server) (*Client, context.Context) {
return buildTestClient(server), contextWithNullLogger()
}

func contextWithNullLogger() context.Context {
return log.NewContextWithLogger(context.Background(), null.NewLogger())
func contextWithNullLoggerDeluxe() (context.Context, log.Logger) {
lgr := null.NewLogger()
return log.NewContextWithLogger(context.Background(), lgr), lgr
}

type mockTokenProvider string

func (p mockTokenProvider) ServerSessionToken() (string, error) {
return string(p), nil
func contextWithNullLogger() context.Context {
ctx, _ := contextWithNullLoggerDeluxe()
return ctx
}

func testServer(t GinkgoTInterface, handler http.HandlerFunc) *httptest.Server {
Expand Down
Loading