Skip to content

Commit

Permalink
fix: error recovery after retry fail (#795)
Browse files Browse the repository at this point in the history
# Motivation

Follow up of #762. Pending tasks should also be finalized (and inserted
into the queue).


# Description

During retry process when a task produces an exception, the retry may
fail during insertion into the queue system or setting the task to
Submitted. When this happen, the task in retried may stay into the queue
and re-acquired. We check if the retried task (the new instance) exists
and if it exists whith status Creating or Submitted, we finalize its
creation. This PR adds also finalize creation of tasks with status
Pending.

# Testing

Unit tests were added to validate were resubmit properly when the new
instance of the task in error is Pending.

# Impact

Should fix part of the issue where tasks stay in Pending or Submitted
whitout being processed.

# Additional Information

We need to try other queue systems to see if the same issue occur with
them.

# Checklist

- [x] My code adheres to the coding and style guidelines of the project.
- [x] I have performed a self-review of my code.
- [x] I have commented my code, particularly in hard-to-understand
areas.
- [ ] I have made corresponding changes to the documentation.
- [x] I have thoroughly tested my modifications and added tests when
necessary.
- [x] Tests pass locally and in the CI.
- [x] I have assessed the performance impact of my modifications.
  • Loading branch information
aneojgurhem authored Nov 14, 2024
2 parents 3a100f2 + 942293e commit 4e10a6f
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 1 deletion.
9 changes: 9 additions & 0 deletions Common/src/Pollster/AcquisitionStatus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ public enum AcquisitionStatus
/// </summary>
TaskIsRetriedAndRetryIsSubmitted,


/// <summary>
/// Task not acquired because its status is <see cref="TaskStatus.Retried" />. Moreover, the retried task is
/// <see cref="TaskStatus.Pending" />
/// Reinsertion in the queue may be required.
/// </summary>
TaskIsRetriedAndRetryIsPending,


/// <summary>
/// Task not acquired because its status is <see cref="TaskStatus.Processing" /> but the other pod does not seem to be
/// processing it
Expand Down
9 changes: 8 additions & 1 deletion Common/src/Pollster/TaskHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ await taskTable_.RetryTask(taskData_,
.ConfigureAwait(false);
}

if (retryData.Status is TaskStatus.Creating or TaskStatus.Submitted)
if (retryData.Status is TaskStatus.Creating or TaskStatus.Pending or TaskStatus.Submitted)
{
logger_.LogWarning("Retried task {task} is in {status}; trying to finalize task creation",
retryId,
Expand All @@ -498,11 +498,18 @@ await submitter_.FinalizeTaskCreation(new List<TaskCreationRequest>
CancellationToken.None)
.ConfigureAwait(false);
}
else
{
logger_.LogInformation("Retried task {task} is in {status}; nothing done",
retryId,
retryData.Status);
}

return (taskNotFound, taskExists, retryData.Status) switch
{
(false, false, TaskStatus.Submitted) => AcquisitionStatus.TaskIsRetriedAndRetryIsSubmitted,
(false, false, TaskStatus.Creating) => AcquisitionStatus.TaskIsRetriedAndRetryIsCreating,
(false, false, TaskStatus.Pending) => AcquisitionStatus.TaskIsRetriedAndRetryIsPending,
(true, false, TaskStatus.Submitted or TaskStatus.Creating) => AcquisitionStatus.TaskIsRetriedAndRetryIsNotFound,
_ => AcquisitionStatus.TaskIsRetried,
};
Expand Down
30 changes: 30 additions & 0 deletions Common/tests/Pollster/TaskHandlerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,16 @@ await testServiceProvider.ResultTable.Create(results)
.ResultId,
},
new List<string>()),
new("TaskRetry2+Pending",
results[0]
.ResultId,
options,
new List<string>
{
results[1]
.ResultId,
},
new List<string>()),
};

await TaskLifeCycleHelper.CreateTasks(testServiceProvider.TaskTable,
Expand Down Expand Up @@ -509,6 +519,23 @@ await testServiceProvider.TaskTable.SetTaskRetryAsync(taskData,
await testServiceProvider.TaskTable.SetTaskRetryAsync(taskData,
"Error for test : not found")
.ConfigureAwait(false);


taskData = await testServiceProvider.TaskTable.ReadTaskAsync("TaskRetry2+Pending")
.ConfigureAwait(false);

await testServiceProvider.TaskTable.SetTaskRetryAsync(taskData,
"Error for test : pending")
.ConfigureAwait(false);

newTaskId = await testServiceProvider.TaskTable.RetryTask(taskData)
.ConfigureAwait(false);

await testServiceProvider.TaskTable.UpdateOneTask(newTaskId,
null,
new UpdateDefinition<TaskData>().Set(data => data.Status,
TaskStatus.Pending))
.ConfigureAwait(false);
}

[Test]
Expand Down Expand Up @@ -950,6 +977,9 @@ public static IEnumerable TestCaseAcquireRetriedTask
yield return new TestCaseData("TaskRetry2+NotFound").Returns(new AcquireTaskReturn(AcquisitionStatus.TaskIsRetriedAndRetryIsNotFound,
TaskStatus.Retried,
QueueMessageStatus.Poisonous));
yield return new TestCaseData("TaskRetry2+Pending").Returns(new AcquireTaskReturn(AcquisitionStatus.TaskIsRetriedAndRetryIsPending,
TaskStatus.Retried,
QueueMessageStatus.Poisonous));
}
}

Expand Down

0 comments on commit 4e10a6f

Please sign in to comment.