-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix resuming subtasks #90
base: v0.10.5_patched
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
import java.util.Set; | ||
import java.util.HashSet; | ||
import java.util.ArrayList; | ||
import java.util.Comparator; | ||
import java.util.stream.Collectors; | ||
import com.google.common.base.Optional; | ||
import com.google.common.base.Strings; | ||
|
@@ -15,11 +16,13 @@ | |
import io.digdag.core.session.StoredTask; | ||
import io.digdag.core.session.SessionStore; | ||
import io.digdag.core.session.ArchivedTask; | ||
import io.digdag.core.session.ImmutableResumingTask; | ||
import io.digdag.core.session.ResumingTask; | ||
import io.digdag.core.session.Task; | ||
import io.digdag.core.session.TaskControlStore; | ||
import io.digdag.core.session.TaskStateCode; | ||
import io.digdag.core.session.TaskStateFlags; | ||
import io.digdag.spi.TaskReport; | ||
import io.digdag.spi.TaskResult; | ||
import io.digdag.client.config.Config; | ||
|
||
|
@@ -185,6 +188,133 @@ private static long addTasks(TaskControlStore store, | |
return rootTaskId; | ||
} | ||
|
||
private static long addInitialTasks(TaskControlStore store, long attemptId, long rootTaskId, | ||
WorkflowTaskList workflowTasks, List<ArchivedTask> archivedTasks) | ||
{ | ||
List<Long> indexToId = new ArrayList<>(); | ||
indexToId.add(rootTaskId); | ||
|
||
workflowTasks.stream() | ||
.skip(1) // skip the root task | ||
.forEach(workflowTask -> { | ||
long id; | ||
long parentId = workflowTask.getParentIndex() | ||
.transform(index -> indexToId.get(index)) | ||
.or(rootTaskId); | ||
|
||
ArchivedTask archivedTask = archivedTasks.stream() | ||
.filter(t -> t.getFullName().equals(workflowTask.getFullName())) | ||
.findFirst() | ||
.orElse(null); | ||
|
||
if (archivedTask == null) { | ||
Task task = Task.taskBuilder() | ||
.parentId(Optional.of(parentId)) | ||
.fullName(workflowTask.getFullName()) | ||
.config(TaskConfig.validate(workflowTask.getConfig())) | ||
.taskType(workflowTask.getTaskType()) | ||
.state(TaskStateCode.BLOCKED) | ||
.stateFlags(TaskStateFlags.empty().withInitialTask()) | ||
.build(); | ||
|
||
id = store.addSubtask(attemptId, task); | ||
} else { | ||
TaskStateCode state; | ||
switch(archivedTask.getState()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 元々のTaskControl#addTasksでは、resumingTasksの有無によってBLOCKED or SUCCESSで分岐していたが、6aa7cb9 によって「resumeで取得するtaskがSUCCESSのみ」という前提が変わったため、stateによる分岐が必要になった。 |
||
case SUCCESS: | ||
state = TaskStateCode.SUCCESS; | ||
break; | ||
case ERROR: | ||
case CANCELED: | ||
state = TaskStateCode.BLOCKED; | ||
break; | ||
default: | ||
state = TaskStateCode.PLANNED; | ||
break; | ||
} | ||
|
||
ResumingTask resumingTask = ImmutableResumingTask.builder() | ||
.sourceTaskId(archivedTask.getId()) | ||
.fullName(archivedTask.getFullName()) | ||
.config(TaskConfig.validate(workflowTask.getConfig())) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ここのconfigのみarchivedTaskではなく、workflowTaskのconfigをINSERTしている。 具体的には、この修正がないと以下のIntegration Testでのretryで落ちる。 +step1:
sh>: touch ${outdir}/1-1.out
+step2:
+a:
sh>: touch ${outdir}/1-2a.out
+b:
fail>: step2b fail +step1:
sh>: touch ${outdir}/2-1.out
+step2:
+a:
sh>: touch ${outdir}/2-2a.out
+b:
sh>: touch ${outdir}/2-2b.out |
||
.updatedAt(archivedTask.getUpdatedAt()) | ||
.subtaskConfig(archivedTask.getSubtaskConfig()) | ||
.exportParams(archivedTask.getExportParams()) | ||
.resetStoreParams(archivedTask.getResetStoreParams()) | ||
.storeParams(archivedTask.getStoreParams()) | ||
.report(archivedTask.getReport().or(TaskReport.empty())) | ||
.error(archivedTask.getError()) | ||
.build(); | ||
|
||
id = store.addResumedSubtask(attemptId, | ||
parentId, | ||
workflowTask.getTaskType(), | ||
state, | ||
TaskStateFlags.empty().withInitialTask(), | ||
resumingTask); | ||
} | ||
|
||
indexToId.add(id); | ||
|
||
if (!workflowTask.getUpstreamIndexes().isEmpty()) { | ||
store.addDependencies( | ||
id, | ||
workflowTask.getUpstreamIndexes() | ||
.stream() | ||
.map(index -> indexToId.get(index)) | ||
.collect(Collectors.toList()) | ||
); | ||
} | ||
}); | ||
|
||
Map<String, Long> taskNameAndIds = workflowTasks.stream() | ||
.collect(Collectors.toMap(WorkflowTask::getFullName, task -> indexToId.get(workflowTasks.indexOf(task)))); | ||
|
||
archivedTasks.stream() | ||
.filter(archivedTask -> !taskNameAndIds.keySet().contains(archivedTask.getFullName()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. すでにINSERT済みのtaskは除外する。 |
||
&& archivedTask.getFullName().contains("^sub")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 単にtask名に「^」を含む場合だと、「^failure-alert」や「^error」などの除外すべきsubtaskも含まれるため、狭義のsubtaskに限定する必要がある。 |
||
.sorted(Comparator.comparingInt((t) -> (int) t.getId())) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 後続のtask_dependenciesテーブルへのINESRTのために、upstreamのtaskを先にtasksテーブルへのINSERTしてIDを採番する必要があるため、あらかじめretry failed前のattemptのTask IDでソートする。 |
||
.forEach(archivedSubtask -> { | ||
String parentTaskName = archivedSubtask.getFullName().replaceAll("(\\^sub|\\+)[^\\^+]*$", ""); | ||
Long parentId = taskNameAndIds.get(parentTaskName); | ||
|
||
TaskStateCode state; | ||
switch(archivedSubtask.getState()) { | ||
case SUCCESS: | ||
state = TaskStateCode.SUCCESS; | ||
break; | ||
case ERROR: | ||
case CANCELED: | ||
state = TaskStateCode.BLOCKED; | ||
break; | ||
default: | ||
state = TaskStateCode.PLANNED; | ||
break; | ||
} | ||
|
||
Long id = store.addResumedSubtask(attemptId, | ||
parentId, | ||
archivedSubtask.getTaskType(), | ||
state, | ||
TaskStateFlags.empty().withInitialTask(), | ||
ResumingTask.of(archivedSubtask)); | ||
|
||
taskNameAndIds.put(archivedSubtask.getFullName(), id); | ||
|
||
archivedTasks.stream() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TaskControl#addTasksと同じようにtask_dependenciesテーブルへのINSERTが必要なため、task名からupstreamのIDを取得しINSERTする。 |
||
.filter(t -> t.getId() == archivedSubtask.getId() - 1) | ||
.findFirst() | ||
.ifPresent((upstreamArchivedTask) -> { | ||
List<Long> upstreamIds = new ArrayList<>(); | ||
Long upstreamId = taskNameAndIds.get(upstreamArchivedTask.getFullName()); | ||
upstreamIds.add(upstreamId); | ||
store.addDependencies(id, upstreamIds); | ||
});; | ||
}); | ||
|
||
return rootTaskId; | ||
} | ||
|
||
private static void addResumingTasks(TaskControlStore store, long attemptId, List<ResumingTask> resumingTasks) | ||
{ | ||
// store only dynamically-generated tasks | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
既存のTaskControl#addTasks は別の箇所からも参照されているため、ただ単に引数のresumingTasksをArchivedTasksに変更することはできなかった。
そのため、新たにaddInitialTasksを実装した。
参照: INSERT時のparameterの不足