Skip to content

Commit

Permalink
log dexcom task failure
Browse files Browse the repository at this point in the history
only error / retry when running the task
  • Loading branch information
jh-bate committed Aug 28, 2023
1 parent 60e40d8 commit dfa7962
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 36 deletions.
39 changes: 18 additions & 21 deletions dexcom/fetch/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,11 @@ func (r *Runner) Run(ctx context.Context, tsk *task.Task) bool {
ResetTask(tsk)

if serverSessionToken, sErr := r.AuthClient().ServerSessionToken(); sErr != nil {
ErrorOrRetryTask(tsk, errors.Wrap(sErr, "unable to get server session token"))
r.logTaskError(tsk, errors.Wrap(sErr, "unable to get server session token"))
} else {
ctx = auth.NewContextWithServerSessionToken(ctx, serverSessionToken)
if taskRunner, tErr := NewTaskRunner(r, tsk); tErr != nil {
ErrorOrRetryTask(tsk, errors.Wrap(tErr, "unable to create task runner"))
r.logTaskError(tsk, errors.Wrap(sErr, "unable to create task runner"))
} else if tErr = taskRunner.Run(ctx); tErr != nil {
ErrorOrRetryTask(tsk, errors.Wrap(tErr, "unable to run task runner"))
}
Expand All @@ -145,10 +145,13 @@ func (r *Runner) Run(ctx context.Context, tsk *task.Task) bool {
if taskDuration := time.Since(now); taskDuration > TaskDurationMaximum {
r.Logger().WithField("taskDuration", taskDuration.Truncate(time.Millisecond).Seconds()).Warn("Task duration exceeds maximum")
}

return true
}

func (r *Runner) logTaskError(tsk *task.Task, err error) {
r.logger.Warnf("dexcom task %s error, task will be retried: %s", tsk.ID, err)
}

type TaskRunner struct {
*Runner
task *task.Task
Expand Down Expand Up @@ -182,7 +185,7 @@ func (t *TaskRunner) Run(ctx context.Context) error {
}

if len(t.task.Data) == 0 {
return t.failTask(errors.New("data is missing"))
return FailTask(t.logger, t.task, errors.New("data is missing"))
}

t.context = ctx
Expand All @@ -202,7 +205,7 @@ func (t *TaskRunner) Run(ctx context.Context) error {
}
if err := t.fetchSinceLatestDataTime(); err != nil {
if request.IsErrorUnauthenticated(errors.Cause(err)) {
t.failTask(err)
FailTask(t.logger, t.task, err)
if updateErr := t.updateDataSourceWithError(err); updateErr != nil {
t.Logger().WithError(updateErr).Error("unable to update data source with error")
}
Expand All @@ -215,14 +218,14 @@ func (t *TaskRunner) Run(ctx context.Context) error {
func (t *TaskRunner) getProviderSession() error {
providerSessionID, ok := t.task.Data["providerSessionId"].(string)
if !ok || providerSessionID == "" {
return t.failTask(errors.New("provider session id is missing"))
return FailTask(t.logger, t.task, errors.New("provider session id is missing"))
}

providerSession, err := t.AuthClient().GetProviderSession(t.context, providerSessionID)
if err != nil {
return errors.Wrap(err, "unable to get provider session")
} else if providerSession == nil {
return t.failTask(errors.Wrap(err, "provider session is missing"))
return FailTask(t.logger, t.task, errors.Wrap(err, "provider session is missing"))
}
t.providerSession = providerSession

Expand All @@ -243,30 +246,24 @@ func (t *TaskRunner) updateProviderSession() error {
if err != nil {
return errors.Wrap(err, "unable to update provider session")
} else if providerSession == nil {
return t.failTask(errors.Wrap(err, "provider session is missing"))
return FailTask(t.logger, t.task, errors.Wrap(err, "provider session is missing"))
}
t.providerSession = providerSession

return nil
}

func (t *TaskRunner) failTask(err error) error {
t.task.SetFailed()
t.logger.Warnf("dexcom task %s failed: %s", t.task.ID, err)
return err
}

func (t *TaskRunner) getDataSource() error {
dataSourceID, ok := t.task.Data["dataSourceId"].(string)
if !ok || dataSourceID == "" {
return t.failTask(errors.New("data source id is missing"))
return FailTask(t.logger, t.task, errors.New("data source id is missing"))
}

source, err := t.DataSourceClient().Get(t.context, dataSourceID)
if err != nil {
return errors.Wrap(err, "unable to get data source")
} else if source == nil {
return t.failTask(errors.Wrap(err, "data source is missing"))
return FailTask(t.logger, t.task, errors.Wrap(err, "data source is missing"))
}
t.dataSource = source

Expand Down Expand Up @@ -319,7 +316,7 @@ func (t *TaskRunner) updateDataSource(update *dataSource.Update) error {
if err != nil {
return errors.Wrap(err, "unable to update data source")
} else if source == nil {
return t.failTask(errors.Wrap(err, "data source is missing"))
return FailTask(t.logger, t.task, errors.Wrap(err, "data source is missing"))
}

t.dataSource = source
Expand All @@ -329,7 +326,7 @@ func (t *TaskRunner) updateDataSource(update *dataSource.Update) error {
func (t *TaskRunner) createTokenSource() error {
tokenSource, err := oauthToken.NewSourceWithToken(t.providerSession.OAuthToken)
if err != nil {
return t.failTask(errors.Wrap(err, "unable to create token source"))
return FailTask(t.logger, t.task, errors.Wrap(err, "unable to create token source"))
}

t.tokenSource = tokenSource
Expand All @@ -343,14 +340,14 @@ func (t *TaskRunner) getDeviceHashes() error {
}
rawMap, rawMapOK := raw.(map[string]interface{})
if !rawMapOK || rawMap == nil {
return t.failTask(errors.New("device hashes is invalid"))
return FailTask(t.logger, t.task, errors.New("device hashes is invalid"))
}
deviceHashes := map[string]string{}
for key, value := range rawMap {
if valueString, valueStringOK := value.(string); valueStringOK {
deviceHashes[key] = valueString
} else {
return t.failTask(errors.New("device hash is invalid"))
return FailTask(t.logger, t.task, errors.New("device hash is invalid"))
}
}

Expand Down Expand Up @@ -385,7 +382,7 @@ func (t *TaskRunner) updateDataSet(dataSetUpdate *data.DataSetUpdate) error {
if err != nil {
return errors.Wrap(err, "unable to update data set")
} else if dataSet == nil {
return t.failTask(errors.Wrap(err, "data set is missing"))
return FailTask(t.logger, t.task, errors.Wrap(err, "data set is missing"))
}

t.dataSet = dataSet
Expand Down
18 changes: 13 additions & 5 deletions dexcom/fetch/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/tidepool-org/platform/errors"
"github.com/tidepool-org/platform/log"
"github.com/tidepool-org/platform/pointer"
"github.com/tidepool-org/platform/task"
)
Expand Down Expand Up @@ -34,12 +35,19 @@ func NewTaskCreate(providerSessionID string, dataSourceID string) (*task.TaskCre
}

func ErrorOrRetryTask(t *task.Task, err error) {
if shouldTaskError(t) {
t.AppendError(err)
t.SetFailed()
return
if t.IsFailed() {
if shouldTaskError(t) {
t.AppendError(err)
return
}
incrementTaskRetryCount(t)
}
incrementTaskRetryCount(t)
}

func FailTask(l log.Logger, t *task.Task, err error) error {
l.Warnf("dexcom task %s failed: %s", t.ID, err)
t.SetFailed()
return err
}

func ResetTask(t *task.Task) {
Expand Down
68 changes: 58 additions & 10 deletions dexcom/fetch/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,47 @@ import (
"errors"

"github.com/tidepool-org/platform/dexcom/fetch"
"github.com/tidepool-org/platform/log"

logNull "github.com/tidepool-org/platform/log/null"
"github.com/tidepool-org/platform/task"
)

var _ = Describe("Task", func() {

var getTask = func(retryCount int) *task.Task {
return &task.Task{
var getTask = func(retryCount int, hasFailed bool) *task.Task {
tsk := &task.Task{
Data: map[string]interface{}{
"retryCount": retryCount,
},
}
if hasFailed {
tsk.SetFailed()
}
return tsk
}

Context("ErrorOrRetryTask", func() {
DescribeTable("will not append error",
func(setupFunc func() (*task.Task, int)) {
tsk, startCount := setupFunc()
Expect(tsk.IsFailed()).To(Equal(true))
Expect(tsk.Data["retryCount"]).To(Equal(startCount))
fetch.ErrorOrRetryTask(tsk, errors.New("some error"))
Expect(tsk.HasError()).To(Equal(false))
Expect(tsk.IsFailed()).To(Equal(false))
Expect(tsk.Data["retryCount"]).To(Equal(startCount + 1))
},
Entry("if zero retries", func() (*task.Task, int) {
return getTask(0), 0
retryCount := 0
return getTask(retryCount, true), retryCount
}),
Entry("if one retry", func() (*task.Task, int) {
return getTask(1), 1
retryCount := 1
return getTask(retryCount, true), retryCount
}),
Entry("if two retries", func() (*task.Task, int) {
return getTask(2), 2
retryCount := 2
return getTask(retryCount, true), retryCount
}),
)
DescribeTable("will append error",
Expand All @@ -47,20 +57,58 @@ var _ = Describe("Task", func() {
Expect(tsk.Data["retryCount"]).To(Equal(startCount))
fetch.ErrorOrRetryTask(tsk, errors.New("some error"))
Expect(tsk.HasError()).To(Equal(true))
Expect(tsk.IsFailed()).To(Equal(true))
},
Entry("when 3rd retry", func() (*task.Task, int) {
return getTask(3), 3
retryCount := 3
return getTask(retryCount, true), retryCount
}),
Entry("more than 3 retries", func() (*task.Task, int) {
return getTask(10), 10
retryCount := 10
return getTask(retryCount, true), retryCount
}),
)
DescribeTable("will ignore if the task is not been failed",
func(setupFunc func() (*task.Task, int)) {
tsk, startCount := setupFunc()
Expect(tsk.Data["retryCount"]).To(Equal(startCount))
fetch.ErrorOrRetryTask(tsk, errors.New("some error"))
Expect(tsk.HasError()).To(Equal(false))
Expect(tsk.Data["retryCount"]).To(Equal(startCount))
},
Entry("when 3rd retry", func() (*task.Task, int) {
retryCount := 3
return getTask(retryCount, false), retryCount
}),
Entry("more 1st retry", func() (*task.Task, int) {
retryCount := 1
return getTask(retryCount, false), retryCount
}),
)
})

Context("FailTask", func() {
var logger log.Logger
BeforeEach(func() {
logger = logNull.NewLogger()
})
It("will set the task to have failed", func() {
tsk := getTask(0, false)
Expect(tsk.IsFailed()).To(Equal(false))
fetch.FailTask(logger, tsk, errors.New("some error"))
Expect(tsk.IsFailed()).To(Equal(true))
})
It("will not change the failure status if already set", func() {
tsk := getTask(0, false)
tsk.SetFailed()
Expect(tsk.IsFailed()).To(Equal(true))
fetch.FailTask(logger, tsk, errors.New("some error"))
Expect(tsk.IsFailed()).To(Equal(true))
})
})

Context("ResetTask", func() {
It("returns the unit value when set", func() {
tsk := getTask(3)
tsk := getTask(3, true)
tsk.AppendError(errors.New("some error"))
Expect(tsk.HasError()).To(Equal(true))
Expect(tsk.Data["retryCount"]).To(Equal(3))
Expand Down

0 comments on commit dfa7962

Please sign in to comment.