From dfa796253efd27557728fc828a8bfd1760ffb3b0 Mon Sep 17 00:00:00 2001 From: Jamie Date: Tue, 29 Aug 2023 10:51:16 +1200 Subject: [PATCH] log dexcom task failure only error / retry when running the task --- dexcom/fetch/runner.go | 39 +++++++++++----------- dexcom/fetch/task.go | 18 ++++++++--- dexcom/fetch/task_test.go | 68 +++++++++++++++++++++++++++++++++------ 3 files changed, 89 insertions(+), 36 deletions(-) diff --git a/dexcom/fetch/runner.go b/dexcom/fetch/runner.go index 53bffdf05..ddef1f7f8 100644 --- a/dexcom/fetch/runner.go +++ b/dexcom/fetch/runner.go @@ -127,11 +127,11 @@ func (r *Runner) Run(ctx context.Context, tsk *task.Task) bool { ResetTask(tsk) if serverSessionToken, sErr := r.AuthClient().ServerSessionToken(); sErr != nil { - ErrorOrRetryTask(tsk, errors.Wrap(sErr, "unable to get server session token")) + r.logTaskError(tsk, errors.Wrap(sErr, "unable to get server session token")) } else { ctx = auth.NewContextWithServerSessionToken(ctx, serverSessionToken) if taskRunner, tErr := NewTaskRunner(r, tsk); tErr != nil { - ErrorOrRetryTask(tsk, errors.Wrap(tErr, "unable to create task runner")) + r.logTaskError(tsk, errors.Wrap(sErr, "unable to create task runner")) } else if tErr = taskRunner.Run(ctx); tErr != nil { ErrorOrRetryTask(tsk, errors.Wrap(tErr, "unable to run task runner")) } @@ -145,10 +145,13 @@ func (r *Runner) Run(ctx context.Context, tsk *task.Task) bool { if taskDuration := time.Since(now); taskDuration > TaskDurationMaximum { r.Logger().WithField("taskDuration", taskDuration.Truncate(time.Millisecond).Seconds()).Warn("Task duration exceeds maximum") } - return true } +func (r *Runner) logTaskError(tsk *task.Task, err error) { + r.logger.Warnf("dexcom task %s error, task will be retried: %s", tsk.ID, err) +} + type TaskRunner struct { *Runner task *task.Task @@ -182,7 +185,7 @@ func (t *TaskRunner) Run(ctx context.Context) error { } if len(t.task.Data) == 0 { - return t.failTask(errors.New("data is missing")) + return FailTask(t.logger, t.task, errors.New("data is missing")) } t.context = ctx @@ -202,7 +205,7 @@ func (t *TaskRunner) Run(ctx context.Context) error { } if err := t.fetchSinceLatestDataTime(); err != nil { if request.IsErrorUnauthenticated(errors.Cause(err)) { - t.failTask(err) + FailTask(t.logger, t.task, err) if updateErr := t.updateDataSourceWithError(err); updateErr != nil { t.Logger().WithError(updateErr).Error("unable to update data source with error") } @@ -215,14 +218,14 @@ func (t *TaskRunner) Run(ctx context.Context) error { func (t *TaskRunner) getProviderSession() error { providerSessionID, ok := t.task.Data["providerSessionId"].(string) if !ok || providerSessionID == "" { - return t.failTask(errors.New("provider session id is missing")) + return FailTask(t.logger, t.task, errors.New("provider session id is missing")) } providerSession, err := t.AuthClient().GetProviderSession(t.context, providerSessionID) if err != nil { return errors.Wrap(err, "unable to get provider session") } else if providerSession == nil { - return t.failTask(errors.Wrap(err, "provider session is missing")) + return FailTask(t.logger, t.task, errors.Wrap(err, "provider session is missing")) } t.providerSession = providerSession @@ -243,30 +246,24 @@ func (t *TaskRunner) updateProviderSession() error { if err != nil { return errors.Wrap(err, "unable to update provider session") } else if providerSession == nil { - return t.failTask(errors.Wrap(err, "provider session is missing")) + return FailTask(t.logger, t.task, errors.Wrap(err, "provider session is missing")) } t.providerSession = providerSession return nil } -func (t *TaskRunner) failTask(err error) error { - t.task.SetFailed() - t.logger.Warnf("dexcom task %s failed: %s", t.task.ID, err) - return err -} - func (t *TaskRunner) getDataSource() error { dataSourceID, ok := t.task.Data["dataSourceId"].(string) if !ok || dataSourceID == "" { - return t.failTask(errors.New("data source id is missing")) + return FailTask(t.logger, t.task, errors.New("data source id is missing")) } source, err := t.DataSourceClient().Get(t.context, dataSourceID) if err != nil { return errors.Wrap(err, "unable to get data source") } else if source == nil { - return t.failTask(errors.Wrap(err, "data source is missing")) + return FailTask(t.logger, t.task, errors.Wrap(err, "data source is missing")) } t.dataSource = source @@ -319,7 +316,7 @@ func (t *TaskRunner) updateDataSource(update *dataSource.Update) error { if err != nil { return errors.Wrap(err, "unable to update data source") } else if source == nil { - return t.failTask(errors.Wrap(err, "data source is missing")) + return FailTask(t.logger, t.task, errors.Wrap(err, "data source is missing")) } t.dataSource = source @@ -329,7 +326,7 @@ func (t *TaskRunner) updateDataSource(update *dataSource.Update) error { func (t *TaskRunner) createTokenSource() error { tokenSource, err := oauthToken.NewSourceWithToken(t.providerSession.OAuthToken) if err != nil { - return t.failTask(errors.Wrap(err, "unable to create token source")) + return FailTask(t.logger, t.task, errors.Wrap(err, "unable to create token source")) } t.tokenSource = tokenSource @@ -343,14 +340,14 @@ func (t *TaskRunner) getDeviceHashes() error { } rawMap, rawMapOK := raw.(map[string]interface{}) if !rawMapOK || rawMap == nil { - return t.failTask(errors.New("device hashes is invalid")) + return FailTask(t.logger, t.task, errors.New("device hashes is invalid")) } deviceHashes := map[string]string{} for key, value := range rawMap { if valueString, valueStringOK := value.(string); valueStringOK { deviceHashes[key] = valueString } else { - return t.failTask(errors.New("device hash is invalid")) + return FailTask(t.logger, t.task, errors.New("device hash is invalid")) } } @@ -385,7 +382,7 @@ func (t *TaskRunner) updateDataSet(dataSetUpdate *data.DataSetUpdate) error { if err != nil { return errors.Wrap(err, "unable to update data set") } else if dataSet == nil { - return t.failTask(errors.Wrap(err, "data set is missing")) + return FailTask(t.logger, t.task, errors.Wrap(err, "data set is missing")) } t.dataSet = dataSet diff --git a/dexcom/fetch/task.go b/dexcom/fetch/task.go index 313e8f75b..67963945a 100644 --- a/dexcom/fetch/task.go +++ b/dexcom/fetch/task.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/tidepool-org/platform/errors" + "github.com/tidepool-org/platform/log" "github.com/tidepool-org/platform/pointer" "github.com/tidepool-org/platform/task" ) @@ -34,12 +35,19 @@ func NewTaskCreate(providerSessionID string, dataSourceID string) (*task.TaskCre } func ErrorOrRetryTask(t *task.Task, err error) { - if shouldTaskError(t) { - t.AppendError(err) - t.SetFailed() - return + if t.IsFailed() { + if shouldTaskError(t) { + t.AppendError(err) + return + } + incrementTaskRetryCount(t) } - incrementTaskRetryCount(t) +} + +func FailTask(l log.Logger, t *task.Task, err error) error { + l.Warnf("dexcom task %s failed: %s", t.ID, err) + t.SetFailed() + return err } func ResetTask(t *task.Task) { diff --git a/dexcom/fetch/task_test.go b/dexcom/fetch/task_test.go index abd34c5f9..f2df097dd 100644 --- a/dexcom/fetch/task_test.go +++ b/dexcom/fetch/task_test.go @@ -8,37 +8,47 @@ import ( "errors" "github.com/tidepool-org/platform/dexcom/fetch" + "github.com/tidepool-org/platform/log" + + logNull "github.com/tidepool-org/platform/log/null" "github.com/tidepool-org/platform/task" ) var _ = Describe("Task", func() { - var getTask = func(retryCount int) *task.Task { - return &task.Task{ + var getTask = func(retryCount int, hasFailed bool) *task.Task { + tsk := &task.Task{ Data: map[string]interface{}{ "retryCount": retryCount, }, } + if hasFailed { + tsk.SetFailed() + } + return tsk } Context("ErrorOrRetryTask", func() { DescribeTable("will not append error", func(setupFunc func() (*task.Task, int)) { tsk, startCount := setupFunc() + Expect(tsk.IsFailed()).To(Equal(true)) Expect(tsk.Data["retryCount"]).To(Equal(startCount)) fetch.ErrorOrRetryTask(tsk, errors.New("some error")) Expect(tsk.HasError()).To(Equal(false)) - Expect(tsk.IsFailed()).To(Equal(false)) Expect(tsk.Data["retryCount"]).To(Equal(startCount + 1)) }, Entry("if zero retries", func() (*task.Task, int) { - return getTask(0), 0 + retryCount := 0 + return getTask(retryCount, true), retryCount }), Entry("if one retry", func() (*task.Task, int) { - return getTask(1), 1 + retryCount := 1 + return getTask(retryCount, true), retryCount }), Entry("if two retries", func() (*task.Task, int) { - return getTask(2), 2 + retryCount := 2 + return getTask(retryCount, true), retryCount }), ) DescribeTable("will append error", @@ -47,20 +57,58 @@ var _ = Describe("Task", func() { Expect(tsk.Data["retryCount"]).To(Equal(startCount)) fetch.ErrorOrRetryTask(tsk, errors.New("some error")) Expect(tsk.HasError()).To(Equal(true)) - Expect(tsk.IsFailed()).To(Equal(true)) }, Entry("when 3rd retry", func() (*task.Task, int) { - return getTask(3), 3 + retryCount := 3 + return getTask(retryCount, true), retryCount }), Entry("more than 3 retries", func() (*task.Task, int) { - return getTask(10), 10 + retryCount := 10 + return getTask(retryCount, true), retryCount + }), + ) + DescribeTable("will ignore if the task is not been failed", + func(setupFunc func() (*task.Task, int)) { + tsk, startCount := setupFunc() + Expect(tsk.Data["retryCount"]).To(Equal(startCount)) + fetch.ErrorOrRetryTask(tsk, errors.New("some error")) + Expect(tsk.HasError()).To(Equal(false)) + Expect(tsk.Data["retryCount"]).To(Equal(startCount)) + }, + Entry("when 3rd retry", func() (*task.Task, int) { + retryCount := 3 + return getTask(retryCount, false), retryCount + }), + Entry("more 1st retry", func() (*task.Task, int) { + retryCount := 1 + return getTask(retryCount, false), retryCount }), ) }) + Context("FailTask", func() { + var logger log.Logger + BeforeEach(func() { + logger = logNull.NewLogger() + }) + It("will set the task to have failed", func() { + tsk := getTask(0, false) + Expect(tsk.IsFailed()).To(Equal(false)) + fetch.FailTask(logger, tsk, errors.New("some error")) + Expect(tsk.IsFailed()).To(Equal(true)) + }) + It("will not change the failure status if already set", func() { + tsk := getTask(0, false) + tsk.SetFailed() + Expect(tsk.IsFailed()).To(Equal(true)) + fetch.FailTask(logger, tsk, errors.New("some error")) + Expect(tsk.IsFailed()).To(Equal(true)) + }) + }) + Context("ResetTask", func() { It("returns the unit value when set", func() { - tsk := getTask(3) + tsk := getTask(3, true) tsk.AppendError(errors.New("some error")) Expect(tsk.HasError()).To(Equal(true)) Expect(tsk.Data["retryCount"]).To(Equal(3))