From 98c2034f8f017ed34a3c380061c35e8e204e749c Mon Sep 17 00:00:00 2001 From: Todd Kazakov Date: Thu, 17 Aug 2023 13:35:43 +0300 Subject: [PATCH] Allow retrying failed sync tasks --- ehr/reconcile/runner.go | 25 ++++++++++++++----------- ehr/sync/runner.go | 40 ++++++++++++++++++++++++++-------------- 2 files changed, 40 insertions(+), 25 deletions(-) diff --git a/ehr/reconcile/runner.go b/ehr/reconcile/runner.go index 76efbe387..0a70fa3d0 100644 --- a/ehr/reconcile/runner.go +++ b/ehr/reconcile/runner.go @@ -55,10 +55,21 @@ func (r *Runner) Run(ctx context.Context, tsk *task.Task) bool { now := time.Now() tsk.ClearError() + r.doRun(ctx, tsk) + tsk.RepeatAvailableAfter(AvailableAfterDurationMinimum + time.Duration(rand.Int63n(int64(AvailableAfterDurationMaximum-AvailableAfterDurationMinimum+1)))) + + if taskDuration := time.Since(now); taskDuration > TaskDurationMaximum { + r.logger.WithField("taskDuration", taskDuration.Truncate(time.Millisecond).Seconds()).Warn("Task duration exceeds maximum") + } + + return true +} + +func (r *Runner) doRun(ctx context.Context, tsk *task.Task) { serverSessionToken, err := r.authClient.ServerSessionToken() if err != nil { tsk.AppendError(errors.Wrap(err, "unable to get server session token")) - return true + return } ctx = auth.NewContextWithServerSessionToken(ctx, serverSessionToken) @@ -67,26 +78,18 @@ func (r *Runner) Run(ctx context.Context, tsk *task.Task) bool { syncTasks, err := r.getSyncTasks(ctx) if err != nil { tsk.AppendError(errors.Wrap(err, "unable to get sync tasks")) + return } // Get the list of all EHR enabled clinics clinicsList, err := r.clinicsClient.ListEHREnabledClinics(ctx) if err != nil { tsk.AppendError(errors.Wrap(err, "unable to list clinics")) + return } plan := GetReconciliationPlan(syncTasks, clinicsList) r.reconcileTasks(ctx, tsk, plan) - - if !tsk.IsFailed() { - tsk.RepeatAvailableAfter(AvailableAfterDurationMinimum + time.Duration(rand.Int63n(int64(AvailableAfterDurationMaximum-AvailableAfterDurationMinimum+1)))) - } - - if taskDuration := time.Since(now); taskDuration > TaskDurationMaximum { - r.logger.WithField("taskDuration", taskDuration.Truncate(time.Millisecond).Seconds()).Warn("Task duration exceeds maximum") - } - - return true } func (r *Runner) getSyncTasks(ctx context.Context) (map[string]task.Task, error) { diff --git a/ehr/sync/runner.go b/ehr/sync/runner.go index 2c2efc0e8..8c3ec63ab 100644 --- a/ehr/sync/runner.go +++ b/ehr/sync/runner.go @@ -12,9 +12,11 @@ import ( ) const ( - AvailableAfterDurationMaximum = AvailableAfterDurationMinimum + 1*time.Hour - AvailableAfterDurationMinimum = 14*24*time.Hour - 30*time.Minute - TaskDurationMaximum = 5 * time.Minute + OnSuccessAvailableAfterDurationMaximum = OnSuccessAvailableAfterDurationMinimum + 1*time.Hour + OnSuccessAvailableAfterDurationMinimum = 14*24*time.Hour - 30*time.Minute + OnErrorAvailableAfterDurationMaximum = OnErrorAvailableAfterDurationMinimum + 5*time.Minute + OnErrorAvailableAfterDurationMinimum = 1*time.Hour - 5*time.Minute + TaskDurationMaximum = 5 * time.Minute ) type Runner struct { @@ -45,19 +47,14 @@ func (r *Runner) Run(ctx context.Context, tsk *task.Task) bool { now := time.Now() tsk.ClearError() - clinicId, err := GetClinicId(tsk.Data) - if err != nil { - tsk.AppendError(errors.Wrap(err, "unable to get clinicId from task data")) - return true - } - - err = r.clinicsClient.SyncEHRData(ctx, clinicId) - if err != nil { - tsk.AppendError(errors.Wrap(err, "unable to sync ehr data")) - } + r.doRun(ctx, tsk) if !tsk.IsFailed() { - tsk.RepeatAvailableAfter(AvailableAfterDurationMinimum + time.Duration(rand.Int63n(int64(AvailableAfterDurationMaximum-AvailableAfterDurationMinimum+1)))) + if tsk.HasError() { + tsk.RepeatAvailableAfter(OnErrorAvailableAfterDurationMinimum + time.Duration(rand.Int63n(int64(OnErrorAvailableAfterDurationMaximum-OnErrorAvailableAfterDurationMinimum+1)))) + } else { + tsk.RepeatAvailableAfter(OnSuccessAvailableAfterDurationMinimum + time.Duration(rand.Int63n(int64(OnSuccessAvailableAfterDurationMaximum-OnSuccessAvailableAfterDurationMinimum+1)))) + } } if taskDuration := time.Since(now); taskDuration > TaskDurationMaximum { @@ -66,3 +63,18 @@ func (r *Runner) Run(ctx context.Context, tsk *task.Task) bool { return true } + +func (r *Runner) doRun(ctx context.Context, tsk *task.Task) { + clinicId, err := GetClinicId(tsk.Data) + if err != nil { + tsk.AppendError(errors.Wrap(err, "unable to get clinicId from task data")) + // Unrecoverable condition, move the task to failed state so it won't be retried + tsk.SetFailed() + return + } + + err = r.clinicsClient.SyncEHRData(ctx, clinicId) + if err != nil { + tsk.AppendError(errors.Wrap(err, "unable to sync ehr data")) + } +}