Skip to content

Commit

Permalink
[BACK-2633] soft failure when running dexcom tasks (#656)
Browse files Browse the repository at this point in the history
* soft failure when running dexcom tasks

* if allowing retry the unset the failed state
  • Loading branch information
jh-bate authored Aug 30, 2023
1 parent 0949fed commit a7f9eba
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 32 deletions.
47 changes: 19 additions & 28 deletions dexcom/fetch/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,13 @@ func (r *Runner) Run(ctx context.Context, tsk *task.Task) bool {
tsk.ClearError()

if serverSessionToken, sErr := r.AuthClient().ServerSessionToken(); sErr != nil {
tsk.AppendError(errors.Wrap(sErr, "unable to get server session token"))
r.ignoreAndLogTaskError(tsk, errors.Wrap(sErr, "unable to get server session token"))
} else {
ctx = auth.NewContextWithServerSessionToken(ctx, serverSessionToken)

if taskRunner, tErr := NewTaskRunner(r, tsk); tErr != nil {
tsk.AppendError(errors.Wrap(tErr, "unable to create task runner"))
r.ignoreAndLogTaskError(tsk, errors.Wrap(sErr, "unable to create task runner"))
} else if tErr = taskRunner.Run(ctx); tErr != nil {
tsk.AppendError(errors.Wrap(tErr, "unable to run task runner"))
ErrorOrRetryTask(tsk, errors.Wrap(tErr, "unable to run task runner"))
}
}
}
Expand All @@ -146,10 +145,13 @@ func (r *Runner) Run(ctx context.Context, tsk *task.Task) bool {
if taskDuration := time.Since(now); taskDuration > TaskDurationMaximum {
r.Logger().WithField("taskDuration", taskDuration.Truncate(time.Millisecond).Seconds()).Warn("Task duration exceeds maximum")
}

return true
}

func (r *Runner) ignoreAndLogTaskError(tsk *task.Task, err error) {
r.logger.Warnf("dexcom task %s error, task will be retried: %s", tsk.ID, err)
}

type TaskRunner struct {
*Runner
task *task.Task
Expand Down Expand Up @@ -183,8 +185,7 @@ func (t *TaskRunner) Run(ctx context.Context) error {
}

if len(t.task.Data) == 0 {
t.task.SetFailed()
return errors.New("data is missing")
return FailTask(t.logger, t.task, errors.New("data is missing"))
}

t.context = ctx
Expand All @@ -204,7 +205,7 @@ func (t *TaskRunner) Run(ctx context.Context) error {
}
if err := t.fetchSinceLatestDataTime(); err != nil {
if request.IsErrorUnauthenticated(errors.Cause(err)) {
t.task.SetFailed()
FailTask(t.logger, t.task, err)
if updateErr := t.updateDataSourceWithError(err); updateErr != nil {
t.Logger().WithError(updateErr).Error("unable to update data source with error")
}
Expand All @@ -217,16 +218,14 @@ func (t *TaskRunner) Run(ctx context.Context) error {
func (t *TaskRunner) getProviderSession() error {
providerSessionID, ok := t.task.Data["providerSessionId"].(string)
if !ok || providerSessionID == "" {
t.task.SetFailed()
return errors.New("provider session id is missing")
return FailTask(t.logger, t.task, errors.New("provider session id is missing"))
}

providerSession, err := t.AuthClient().GetProviderSession(t.context, providerSessionID)
if err != nil {
return errors.Wrap(err, "unable to get provider session")
} else if providerSession == nil {
t.task.SetFailed()
return errors.Wrap(err, "provider session is missing")
return FailTask(t.logger, t.task, errors.Wrap(err, "provider session is missing"))
}
t.providerSession = providerSession

Expand All @@ -247,8 +246,7 @@ func (t *TaskRunner) updateProviderSession() error {
if err != nil {
return errors.Wrap(err, "unable to update provider session")
} else if providerSession == nil {
t.task.SetFailed()
return errors.Wrap(err, "provider session is missing")
return FailTask(t.logger, t.task, errors.Wrap(err, "provider session is missing"))
}
t.providerSession = providerSession

Expand All @@ -258,16 +256,14 @@ func (t *TaskRunner) updateProviderSession() error {
func (t *TaskRunner) getDataSource() error {
dataSourceID, ok := t.task.Data["dataSourceId"].(string)
if !ok || dataSourceID == "" {
t.task.SetFailed()
return errors.New("data source id is missing")
return FailTask(t.logger, t.task, errors.New("data source id is missing"))
}

source, err := t.DataSourceClient().Get(t.context, dataSourceID)
if err != nil {
return errors.Wrap(err, "unable to get data source")
} else if source == nil {
t.task.SetFailed()
return errors.Wrap(err, "data source is missing")
return FailTask(t.logger, t.task, errors.Wrap(err, "data source is missing"))
}
t.dataSource = source

Expand Down Expand Up @@ -320,8 +316,7 @@ func (t *TaskRunner) updateDataSource(update *dataSource.Update) error {
if err != nil {
return errors.Wrap(err, "unable to update data source")
} else if source == nil {
t.task.SetFailed()
return errors.Wrap(err, "data source is missing")
return FailTask(t.logger, t.task, errors.Wrap(err, "data source is missing"))
}

t.dataSource = source
Expand All @@ -331,8 +326,7 @@ func (t *TaskRunner) updateDataSource(update *dataSource.Update) error {
func (t *TaskRunner) createTokenSource() error {
tokenSource, err := oauthToken.NewSourceWithToken(t.providerSession.OAuthToken)
if err != nil {
t.task.SetFailed()
return errors.Wrap(err, "unable to create token source")
return FailTask(t.logger, t.task, errors.Wrap(err, "unable to create token source"))
}

t.tokenSource = tokenSource
Expand All @@ -346,16 +340,14 @@ func (t *TaskRunner) getDeviceHashes() error {
}
rawMap, rawMapOK := raw.(map[string]interface{})
if !rawMapOK || rawMap == nil {
t.task.SetFailed()
return errors.New("device hashes is invalid")
return FailTask(t.logger, t.task, errors.New("device hashes is invalid"))
}
deviceHashes := map[string]string{}
for key, value := range rawMap {
if valueString, valueStringOK := value.(string); valueStringOK {
deviceHashes[key] = valueString
} else {
t.task.SetFailed()
return errors.New("device hash is invalid")
return FailTask(t.logger, t.task, errors.New("device hash is invalid"))
}
}

Expand Down Expand Up @@ -390,8 +382,7 @@ func (t *TaskRunner) updateDataSet(dataSetUpdate *data.DataSetUpdate) error {
if err != nil {
return errors.Wrap(err, "unable to update data set")
} else if dataSet == nil {
t.task.SetFailed()
return errors.Wrap(err, "data set is missing")
return FailTask(t.logger, t.task, errors.Wrap(err, "data set is missing"))
}

t.dataSet = dataSet
Expand Down
3 changes: 1 addition & 2 deletions dexcom/fetch/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@ import (
. "github.com/onsi/ginkgo"
)

var _ = Describe("Runner", func() {
})
var _ = Describe("Runner", func() {})
47 changes: 45 additions & 2 deletions dexcom/fetch/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/tidepool-org/platform/errors"
"github.com/tidepool-org/platform/log"
"github.com/tidepool-org/platform/pointer"
"github.com/tidepool-org/platform/task"
)
Expand All @@ -12,6 +13,8 @@ func TaskName(providerSessionID string) string {
return fmt.Sprintf("%s:%s", Type, providerSessionID)
}

const dexcomTaskRetryField = "retryCount"

func NewTaskCreate(providerSessionID string, dataSourceID string) (*task.TaskCreate, error) {
if providerSessionID == "" {
return nil, errors.New("provider session id is missing")
Expand All @@ -24,8 +27,48 @@ func NewTaskCreate(providerSessionID string, dataSourceID string) (*task.TaskCre
Name: pointer.FromString(TaskName(providerSessionID)),
Type: Type,
Data: map[string]interface{}{
"providerSessionId": providerSessionID,
"dataSourceId": dataSourceID,
"providerSessionId": providerSessionID,
"dataSourceId": dataSourceID,
dexcomTaskRetryField: 0,
},
}, nil
}

func ErrorOrRetryTask(t *task.Task, err error) {
if t.IsFailed() {
if shouldTaskError(t) {
t.AppendError(err)
return
}
incrementTaskRetryCount(t)
t.State = task.TaskStateCompleted
}
}

func FailTask(l log.Logger, t *task.Task, err error) error {
l.Warnf("dexcom task %s failed: %s", t.ID, err)
t.SetFailed()
return err
}

func shouldTaskError(t *task.Task) bool {
if t.Data[dexcomTaskRetryField] != nil {
count, ok := t.Data[dexcomTaskRetryField].(int)
if ok {
return count >= 3
}
}
return true
}

func incrementTaskRetryCount(t *task.Task) {
if t.Data[dexcomTaskRetryField] != nil {
count, ok := t.Data[dexcomTaskRetryField].(int)
if ok {
count++
t.Data[dexcomTaskRetryField] = count
}
} else {
t.Data[dexcomTaskRetryField] = 1
}
}
135 changes: 135 additions & 0 deletions dexcom/fetch/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,142 @@ package fetch_test

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"

"errors"

"github.com/tidepool-org/platform/dexcom/fetch"
"github.com/tidepool-org/platform/log"

logNull "github.com/tidepool-org/platform/log/null"
"github.com/tidepool-org/platform/task"
)

var _ = Describe("Task", func() {

var getTask = func(retryCount int, hasFailed bool) *task.Task {
tsk := &task.Task{
Data: map[string]interface{}{
"retryCount": retryCount,
},
}
if hasFailed {
tsk.SetFailed()
}
return tsk
}

Context("ErrorOrRetryTask", func() {
DescribeTable("will not append error",
func(setupFunc func() (*task.Task, int)) {
tsk, startCount := setupFunc()
Expect(tsk.IsFailed()).To(Equal(true))
Expect(tsk.Data["retryCount"]).To(Equal(startCount))
fetch.ErrorOrRetryTask(tsk, errors.New("some error"))
Expect(tsk.HasError()).To(Equal(false))
Expect(tsk.Data["retryCount"]).To(Equal(startCount + 1))
Expect(tsk.IsFailed()).To(Equal(false))
},
Entry("if zero retries", func() (*task.Task, int) {
retryCount := 0
return getTask(retryCount, true), retryCount
}),
Entry("if one retry", func() (*task.Task, int) {
retryCount := 1
return getTask(retryCount, true), retryCount
}),
Entry("if two retries", func() (*task.Task, int) {
retryCount := 2
return getTask(retryCount, true), retryCount
}),
)
DescribeTable("will append error",
func(setupFunc func() (*task.Task, int)) {
tsk, startCount := setupFunc()
Expect(tsk.Data["retryCount"]).To(Equal(startCount))
fetch.ErrorOrRetryTask(tsk, errors.New("some error"))
Expect(tsk.HasError()).To(Equal(true))
Expect(tsk.IsFailed()).To(Equal(true))
},
Entry("when 3rd retry", func() (*task.Task, int) {
retryCount := 3
return getTask(retryCount, true), retryCount
}),
Entry("more than 3 retries", func() (*task.Task, int) {
retryCount := 10
return getTask(retryCount, true), retryCount
}),
)
DescribeTable("will ignore if the task is not been failed",
func(setupFunc func() (*task.Task, int)) {
tsk, startCount := setupFunc()
Expect(tsk.Data["retryCount"]).To(Equal(startCount))
fetch.ErrorOrRetryTask(tsk, errors.New("some error"))
Expect(tsk.HasError()).To(Equal(false))
Expect(tsk.Data["retryCount"]).To(Equal(startCount))
},
Entry("when 3rd retry", func() (*task.Task, int) {
retryCount := 3
return getTask(retryCount, false), retryCount
}),
Entry("more 1st retry", func() (*task.Task, int) {
retryCount := 1
return getTask(retryCount, false), retryCount
}),
)
})

Context("FailTask", func() {
var logger log.Logger
BeforeEach(func() {
logger = logNull.NewLogger()
})
It("will set the task to have failed", func() {
tsk := getTask(0, false)
Expect(tsk.IsFailed()).To(Equal(false))
fetch.FailTask(logger, tsk, errors.New("some error"))
Expect(tsk.IsFailed()).To(Equal(true))
})
It("will not change the failure status if already set", func() {
tsk := getTask(0, false)
tsk.SetFailed()
Expect(tsk.IsFailed()).To(Equal(true))
fetch.FailTask(logger, tsk, errors.New("some error"))
Expect(tsk.IsFailed()).To(Equal(true))
})
})

Context("NewTaskCreate", func() {
const providerID = "some-provider-id"
const sourceID = "some-source-id"

It("returns an error when provider session id not set", func() {
tc, err := fetch.NewTaskCreate("", sourceID)
Expect(err).ToNot(BeNil())
Expect(tc).To(BeNil())
Expect(err.Error()).To(ContainSubstring("provider session id is missing"))
})
It("returns an error when data source id not set", func() {
tc, err := fetch.NewTaskCreate(providerID, "")
Expect(err).ToNot(BeNil())
Expect(tc).To(BeNil())
Expect(err.Error()).To(ContainSubstring("data source id is missing"))
})
It("returns an initialised task create", func() {
tc, err := fetch.NewTaskCreate(providerID, sourceID)
Expect(err).To(BeNil())
Expect(tc).ToNot(BeNil())
})

It("task has data initialised", func() {
tc, _ := fetch.NewTaskCreate(providerID, sourceID)
Expect(tc).ToNot(BeNil())
Expect(tc.Type).To(Equal(fetch.Type))
Expect(tc.Data["providerSessionId"]).To(Equal(providerID))
Expect(tc.Data["dataSourceId"]).To(Equal(sourceID))
Expect(tc.Data["retryCount"]).To(Equal(0))
})

})
})

0 comments on commit a7f9eba

Please sign in to comment.