diff --git a/dexcom/fetch/runner.go b/dexcom/fetch/runner.go index 7f769cfb4..b5fa25ab6 100644 --- a/dexcom/fetch/runner.go +++ b/dexcom/fetch/runner.go @@ -127,14 +127,13 @@ func (r *Runner) Run(ctx context.Context, tsk *task.Task) bool { tsk.ClearError() if serverSessionToken, sErr := r.AuthClient().ServerSessionToken(); sErr != nil { - tsk.AppendError(errors.Wrap(sErr, "unable to get server session token")) + r.ignoreAndLogTaskError(tsk, errors.Wrap(sErr, "unable to get server session token")) } else { ctx = auth.NewContextWithServerSessionToken(ctx, serverSessionToken) - if taskRunner, tErr := NewTaskRunner(r, tsk); tErr != nil { - tsk.AppendError(errors.Wrap(tErr, "unable to create task runner")) + r.ignoreAndLogTaskError(tsk, errors.Wrap(sErr, "unable to create task runner")) } else if tErr = taskRunner.Run(ctx); tErr != nil { - tsk.AppendError(errors.Wrap(tErr, "unable to run task runner")) + ErrorOrRetryTask(tsk, errors.Wrap(tErr, "unable to run task runner")) } } } @@ -146,10 +145,13 @@ func (r *Runner) Run(ctx context.Context, tsk *task.Task) bool { if taskDuration := time.Since(now); taskDuration > TaskDurationMaximum { r.Logger().WithField("taskDuration", taskDuration.Truncate(time.Millisecond).Seconds()).Warn("Task duration exceeds maximum") } - return true } +func (r *Runner) ignoreAndLogTaskError(tsk *task.Task, err error) { + r.logger.Warnf("dexcom task %s error, task will be retried: %s", tsk.ID, err) +} + type TaskRunner struct { *Runner task *task.Task @@ -183,8 +185,7 @@ func (t *TaskRunner) Run(ctx context.Context) error { } if len(t.task.Data) == 0 { - t.task.SetFailed() - return errors.New("data is missing") + return FailTask(t.logger, t.task, errors.New("data is missing")) } t.context = ctx @@ -204,7 +205,7 @@ func (t *TaskRunner) Run(ctx context.Context) error { } if err := t.fetchSinceLatestDataTime(); err != nil { if request.IsErrorUnauthenticated(errors.Cause(err)) { - t.task.SetFailed() + FailTask(t.logger, t.task, err) if updateErr := t.updateDataSourceWithError(err); updateErr != nil { t.Logger().WithError(updateErr).Error("unable to update data source with error") } @@ -217,16 +218,14 @@ func (t *TaskRunner) Run(ctx context.Context) error { func (t *TaskRunner) getProviderSession() error { providerSessionID, ok := t.task.Data["providerSessionId"].(string) if !ok || providerSessionID == "" { - t.task.SetFailed() - return errors.New("provider session id is missing") + return FailTask(t.logger, t.task, errors.New("provider session id is missing")) } providerSession, err := t.AuthClient().GetProviderSession(t.context, providerSessionID) if err != nil { return errors.Wrap(err, "unable to get provider session") } else if providerSession == nil { - t.task.SetFailed() - return errors.Wrap(err, "provider session is missing") + return FailTask(t.logger, t.task, errors.Wrap(err, "provider session is missing")) } t.providerSession = providerSession @@ -247,8 +246,7 @@ func (t *TaskRunner) updateProviderSession() error { if err != nil { return errors.Wrap(err, "unable to update provider session") } else if providerSession == nil { - t.task.SetFailed() - return errors.Wrap(err, "provider session is missing") + return FailTask(t.logger, t.task, errors.Wrap(err, "provider session is missing")) } t.providerSession = providerSession @@ -258,16 +256,14 @@ func (t *TaskRunner) updateProviderSession() error { func (t *TaskRunner) getDataSource() error { dataSourceID, ok := t.task.Data["dataSourceId"].(string) if !ok || dataSourceID == "" { - t.task.SetFailed() - return errors.New("data source id is missing") + return FailTask(t.logger, t.task, errors.New("data source id is missing")) } source, err := t.DataSourceClient().Get(t.context, dataSourceID) if err != nil { return errors.Wrap(err, "unable to get data source") } else if source == nil { - t.task.SetFailed() - return errors.Wrap(err, "data source is missing") + return FailTask(t.logger, t.task, errors.Wrap(err, "data source is missing")) } t.dataSource = source @@ -320,8 +316,7 @@ func (t *TaskRunner) updateDataSource(update *dataSource.Update) error { if err != nil { return errors.Wrap(err, "unable to update data source") } else if source == nil { - t.task.SetFailed() - return errors.Wrap(err, "data source is missing") + return FailTask(t.logger, t.task, errors.Wrap(err, "data source is missing")) } t.dataSource = source @@ -331,8 +326,7 @@ func (t *TaskRunner) updateDataSource(update *dataSource.Update) error { func (t *TaskRunner) createTokenSource() error { tokenSource, err := oauthToken.NewSourceWithToken(t.providerSession.OAuthToken) if err != nil { - t.task.SetFailed() - return errors.Wrap(err, "unable to create token source") + return FailTask(t.logger, t.task, errors.Wrap(err, "unable to create token source")) } t.tokenSource = tokenSource @@ -346,16 +340,14 @@ func (t *TaskRunner) getDeviceHashes() error { } rawMap, rawMapOK := raw.(map[string]interface{}) if !rawMapOK || rawMap == nil { - t.task.SetFailed() - return errors.New("device hashes is invalid") + return FailTask(t.logger, t.task, errors.New("device hashes is invalid")) } deviceHashes := map[string]string{} for key, value := range rawMap { if valueString, valueStringOK := value.(string); valueStringOK { deviceHashes[key] = valueString } else { - t.task.SetFailed() - return errors.New("device hash is invalid") + return FailTask(t.logger, t.task, errors.New("device hash is invalid")) } } @@ -390,8 +382,7 @@ func (t *TaskRunner) updateDataSet(dataSetUpdate *data.DataSetUpdate) error { if err != nil { return errors.Wrap(err, "unable to update data set") } else if dataSet == nil { - t.task.SetFailed() - return errors.Wrap(err, "data set is missing") + return FailTask(t.logger, t.task, errors.Wrap(err, "data set is missing")) } t.dataSet = dataSet diff --git a/dexcom/fetch/runner_test.go b/dexcom/fetch/runner_test.go index 2ab82104c..65a06ae82 100644 --- a/dexcom/fetch/runner_test.go +++ b/dexcom/fetch/runner_test.go @@ -4,5 +4,4 @@ import ( . "github.com/onsi/ginkgo" ) -var _ = Describe("Runner", func() { -}) +var _ = Describe("Runner", func() {}) diff --git a/dexcom/fetch/task.go b/dexcom/fetch/task.go index 76f4385cb..9cd039726 100644 --- a/dexcom/fetch/task.go +++ b/dexcom/fetch/task.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/tidepool-org/platform/errors" + "github.com/tidepool-org/platform/log" "github.com/tidepool-org/platform/pointer" "github.com/tidepool-org/platform/task" ) @@ -12,6 +13,8 @@ func TaskName(providerSessionID string) string { return fmt.Sprintf("%s:%s", Type, providerSessionID) } +const dexcomTaskRetryField = "retryCount" + func NewTaskCreate(providerSessionID string, dataSourceID string) (*task.TaskCreate, error) { if providerSessionID == "" { return nil, errors.New("provider session id is missing") @@ -24,8 +27,48 @@ func NewTaskCreate(providerSessionID string, dataSourceID string) (*task.TaskCre Name: pointer.FromString(TaskName(providerSessionID)), Type: Type, Data: map[string]interface{}{ - "providerSessionId": providerSessionID, - "dataSourceId": dataSourceID, + "providerSessionId": providerSessionID, + "dataSourceId": dataSourceID, + dexcomTaskRetryField: 0, }, }, nil } + +func ErrorOrRetryTask(t *task.Task, err error) { + if t.IsFailed() { + if shouldTaskError(t) { + t.AppendError(err) + return + } + incrementTaskRetryCount(t) + t.State = task.TaskStateCompleted + } +} + +func FailTask(l log.Logger, t *task.Task, err error) error { + l.Warnf("dexcom task %s failed: %s", t.ID, err) + t.SetFailed() + return err +} + +func shouldTaskError(t *task.Task) bool { + if t.Data[dexcomTaskRetryField] != nil { + count, ok := t.Data[dexcomTaskRetryField].(int) + if ok { + return count >= 3 + } + } + return true +} + +func incrementTaskRetryCount(t *task.Task) { + if t.Data[dexcomTaskRetryField] != nil { + count, ok := t.Data[dexcomTaskRetryField].(int) + if ok { + count++ + t.Data[dexcomTaskRetryField] = count + } + } else { + t.Data[dexcomTaskRetryField] = 1 + } +} diff --git a/dexcom/fetch/task_test.go b/dexcom/fetch/task_test.go index 603a8e814..6c264daf8 100644 --- a/dexcom/fetch/task_test.go +++ b/dexcom/fetch/task_test.go @@ -2,7 +2,142 @@ package fetch_test import ( . "github.com/onsi/ginkgo" + . "github.com/onsi/ginkgo/extensions/table" + . "github.com/onsi/gomega" + + "errors" + + "github.com/tidepool-org/platform/dexcom/fetch" + "github.com/tidepool-org/platform/log" + + logNull "github.com/tidepool-org/platform/log/null" + "github.com/tidepool-org/platform/task" ) var _ = Describe("Task", func() { + + var getTask = func(retryCount int, hasFailed bool) *task.Task { + tsk := &task.Task{ + Data: map[string]interface{}{ + "retryCount": retryCount, + }, + } + if hasFailed { + tsk.SetFailed() + } + return tsk + } + + Context("ErrorOrRetryTask", func() { + DescribeTable("will not append error", + func(setupFunc func() (*task.Task, int)) { + tsk, startCount := setupFunc() + Expect(tsk.IsFailed()).To(Equal(true)) + Expect(tsk.Data["retryCount"]).To(Equal(startCount)) + fetch.ErrorOrRetryTask(tsk, errors.New("some error")) + Expect(tsk.HasError()).To(Equal(false)) + Expect(tsk.Data["retryCount"]).To(Equal(startCount + 1)) + Expect(tsk.IsFailed()).To(Equal(false)) + }, + Entry("if zero retries", func() (*task.Task, int) { + retryCount := 0 + return getTask(retryCount, true), retryCount + }), + Entry("if one retry", func() (*task.Task, int) { + retryCount := 1 + return getTask(retryCount, true), retryCount + }), + Entry("if two retries", func() (*task.Task, int) { + retryCount := 2 + return getTask(retryCount, true), retryCount + }), + ) + DescribeTable("will append error", + func(setupFunc func() (*task.Task, int)) { + tsk, startCount := setupFunc() + Expect(tsk.Data["retryCount"]).To(Equal(startCount)) + fetch.ErrorOrRetryTask(tsk, errors.New("some error")) + Expect(tsk.HasError()).To(Equal(true)) + Expect(tsk.IsFailed()).To(Equal(true)) + }, + Entry("when 3rd retry", func() (*task.Task, int) { + retryCount := 3 + return getTask(retryCount, true), retryCount + }), + Entry("more than 3 retries", func() (*task.Task, int) { + retryCount := 10 + return getTask(retryCount, true), retryCount + }), + ) + DescribeTable("will ignore if the task is not been failed", + func(setupFunc func() (*task.Task, int)) { + tsk, startCount := setupFunc() + Expect(tsk.Data["retryCount"]).To(Equal(startCount)) + fetch.ErrorOrRetryTask(tsk, errors.New("some error")) + Expect(tsk.HasError()).To(Equal(false)) + Expect(tsk.Data["retryCount"]).To(Equal(startCount)) + }, + Entry("when 3rd retry", func() (*task.Task, int) { + retryCount := 3 + return getTask(retryCount, false), retryCount + }), + Entry("more 1st retry", func() (*task.Task, int) { + retryCount := 1 + return getTask(retryCount, false), retryCount + }), + ) + }) + + Context("FailTask", func() { + var logger log.Logger + BeforeEach(func() { + logger = logNull.NewLogger() + }) + It("will set the task to have failed", func() { + tsk := getTask(0, false) + Expect(tsk.IsFailed()).To(Equal(false)) + fetch.FailTask(logger, tsk, errors.New("some error")) + Expect(tsk.IsFailed()).To(Equal(true)) + }) + It("will not change the failure status if already set", func() { + tsk := getTask(0, false) + tsk.SetFailed() + Expect(tsk.IsFailed()).To(Equal(true)) + fetch.FailTask(logger, tsk, errors.New("some error")) + Expect(tsk.IsFailed()).To(Equal(true)) + }) + }) + + Context("NewTaskCreate", func() { + const providerID = "some-provider-id" + const sourceID = "some-source-id" + + It("returns an error when provider session id not set", func() { + tc, err := fetch.NewTaskCreate("", sourceID) + Expect(err).ToNot(BeNil()) + Expect(tc).To(BeNil()) + Expect(err.Error()).To(ContainSubstring("provider session id is missing")) + }) + It("returns an error when data source id not set", func() { + tc, err := fetch.NewTaskCreate(providerID, "") + Expect(err).ToNot(BeNil()) + Expect(tc).To(BeNil()) + Expect(err.Error()).To(ContainSubstring("data source id is missing")) + }) + It("returns an initialised task create", func() { + tc, err := fetch.NewTaskCreate(providerID, sourceID) + Expect(err).To(BeNil()) + Expect(tc).ToNot(BeNil()) + }) + + It("task has data initialised", func() { + tc, _ := fetch.NewTaskCreate(providerID, sourceID) + Expect(tc).ToNot(BeNil()) + Expect(tc.Type).To(Equal(fetch.Type)) + Expect(tc.Data["providerSessionId"]).To(Equal(providerID)) + Expect(tc.Data["dataSourceId"]).To(Equal(sourceID)) + Expect(tc.Data["retryCount"]).To(Equal(0)) + }) + + }) })