Skip to content

Commit

Permalink
refactor: factorize some code
Browse files Browse the repository at this point in the history
  • Loading branch information
aneojgurhem committed Sep 25, 2024
1 parent 8daca5c commit 31e0a8e
Showing 1 changed file with 44 additions and 51 deletions.
95 changes: 44 additions & 51 deletions Common/src/Pollster/TaskHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -448,38 +448,20 @@ await submitter_.CompleteTaskAsync(taskData_,
messageHandler_.Status = QueueMessageStatus.Poisonous;
var retryId = taskData_.RetryId();

TaskData retryData;
var taskNotFound = false;
var taskExists = false;
try
{
var retryData = await taskTable_.ReadTaskAsync(retryId,
lateCts_.Token)
.ConfigureAwait(false);
if (retryData.Status is TaskStatus.Creating or TaskStatus.Submitted)
{
logger_.LogWarning("Retried task {task} is in {status}; trying to finalize task creation",
retryId,
retryData.Status);
await submitter_.FinalizeTaskCreation(new List<TaskCreationRequest>
{
new(retryId,
retryData.PayloadId,
retryData.Options,
retryData.ExpectedOutputIds,
retryData.DataDependencies),
},
sessionData_,
taskData_.TaskId,
CancellationToken.None)
.ConfigureAwait(false);
return retryData.Status == TaskStatus.Submitted
? AcquisitionStatus.TaskIsRetriedAndRetryIsSubmitted
: AcquisitionStatus.TaskIsRetriedAndRetryIsCreating;
}
retryData = await taskTable_.ReadTaskAsync(retryId,
lateCts_.Token)
.ConfigureAwait(false);
}
catch (TaskNotFoundException)
{
logger_.LogWarning("Retried task {task} was not found in the database; resubmit it",
retryId);

taskNotFound = true;
try
{
await taskTable_.RetryTask(taskData_,
Expand All @@ -490,36 +472,47 @@ await taskTable_.RetryTask(taskData_,
{
logger_.LogWarning("Retried task {task} already exists; finalize creation if needed",
retryId);
taskExists = true;
}
finally
{
var retryData = await taskTable_.ReadTaskAsync(retryId,
lateCts_.Token)
.ConfigureAwait(false);
if (retryData.Status is TaskStatus.Creating or TaskStatus.Submitted)
{
logger_.LogWarning("Retried task {task} is in {status}; trying to finalize task creation",
retryId,
retryData.Status);
await submitter_.FinalizeTaskCreation(new List<TaskCreationRequest>
{
new(retryId,
retryData.PayloadId,
retryData.Options,
retryData.ExpectedOutputIds,
retryData.DataDependencies),
},
sessionData_,
taskData_.TaskId,
CancellationToken.None)
.ConfigureAwait(false);
}
}

return AcquisitionStatus.TaskIsRetriedAndRetryIsNotFound;
retryData = await taskTable_.ReadTaskAsync(retryId,
CancellationToken.None)
.ConfigureAwait(false);
}

if (retryData.Status is TaskStatus.Creating or TaskStatus.Submitted)
{
logger_.LogWarning("Retried task {task} is in {status}; trying to finalize task creation",
retryId,
retryData.Status);
await submitter_.FinalizeTaskCreation(new List<TaskCreationRequest>
{
new(retryId,
retryData.PayloadId,
retryData.Options,
retryData.ExpectedOutputIds,
retryData.DataDependencies),
},
sessionData_,
taskData_.TaskId,
CancellationToken.None)
.ConfigureAwait(false);
}

switch ((taskNotFound, taskExists, retryData.Status))
{
case (false, false, TaskStatus.Submitted):
return AcquisitionStatus.TaskIsRetriedAndRetryIsSubmitted;
case (false, false, TaskStatus.Creating):
return AcquisitionStatus.TaskIsRetriedAndRetryIsCreating;
case (true, false, TaskStatus.Submitted):
return AcquisitionStatus.TaskIsRetriedAndRetryIsNotFound;
case (true, false, TaskStatus.Creating):
return AcquisitionStatus.TaskIsRetriedAndRetryIsNotFound;
default:
return AcquisitionStatus.TaskIsRetried;
}

return AcquisitionStatus.TaskIsRetried;
case TaskStatus.Unspecified:
default:
logger_.LogCritical("Task was in an unknown state {state}",
Expand Down

0 comments on commit 31e0a8e

Please sign in to comment.