-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix resuming subtasks #90
base: v0.10.5_patched
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,9 @@ | |
import java.util.Set; | ||
import java.util.HashSet; | ||
import java.util.ArrayList; | ||
import java.util.Comparator; | ||
import java.util.stream.Collectors; | ||
|
||
import com.google.common.base.Optional; | ||
import com.google.common.base.Strings; | ||
import com.google.common.collect.*; | ||
|
@@ -15,11 +17,13 @@ | |
import io.digdag.core.session.StoredTask; | ||
import io.digdag.core.session.SessionStore; | ||
import io.digdag.core.session.ArchivedTask; | ||
import io.digdag.core.session.ImmutableResumingTask; | ||
import io.digdag.core.session.ResumingTask; | ||
import io.digdag.core.session.Task; | ||
import io.digdag.core.session.TaskControlStore; | ||
import io.digdag.core.session.TaskStateCode; | ||
import io.digdag.core.session.TaskStateFlags; | ||
import io.digdag.spi.TaskReport; | ||
import io.digdag.spi.TaskResult; | ||
import io.digdag.client.config.Config; | ||
|
||
|
@@ -55,14 +59,12 @@ public TaskStateCode getState() | |
|
||
public static long addInitialTasksExceptingRootTask( | ||
TaskControlStore store, long attemptId, long rootTaskId, | ||
WorkflowTaskList tasks, List<ResumingTask> resumingTasks, Limits limits) | ||
WorkflowTaskList tasks, List<ArchivedTask> archivedTasks, Limits limits) | ||
throws TaskLimitExceededException | ||
{ | ||
checkTaskLimit(store, attemptId, tasks, limits); | ||
long taskId = addTasks(store, attemptId, rootTaskId, | ||
tasks, ImmutableList.of(), | ||
false, true, true, | ||
resumingTasks); | ||
long taskId = addInitialTasks(store, attemptId, rootTaskId, tasks, archivedTasks); | ||
List<ResumingTask> resumingTasks = archivedTasks.stream().map(ResumingTask::of).collect(Collectors.toList()); | ||
addResumingTasks(store, attemptId, resumingTasks); | ||
return taskId; | ||
} | ||
|
@@ -185,6 +187,133 @@ private static long addTasks(TaskControlStore store, | |
return rootTaskId; | ||
} | ||
|
||
private static long addInitialTasks(TaskControlStore store, long attemptId, long rootTaskId, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 既存のTaskControl#addTasks は別の箇所からも参照されているため、ただ単に引数のresumingTasksをArchivedTasksに変更することはできなかった。 |
||
WorkflowTaskList workflowTasks, List<ArchivedTask> archivedTasks) | ||
{ | ||
List<Long> indexToId = new ArrayList<>(); | ||
indexToId.add(rootTaskId); | ||
|
||
workflowTasks.stream() | ||
.skip(1) // skip the root task | ||
.forEach(workflowTask -> { | ||
long id; | ||
long parentId = workflowTask.getParentIndex() | ||
.transform(index -> indexToId.get(index)) | ||
.or(rootTaskId); | ||
|
||
ArchivedTask archivedTask = archivedTasks.stream() | ||
.filter(t -> t.getFullName().equals(workflowTask.getFullName())) | ||
.findFirst() | ||
.orElse(null); | ||
|
||
if (archivedTask == null) { | ||
Task task = Task.taskBuilder() | ||
.parentId(Optional.of(parentId)) | ||
.fullName(workflowTask.getFullName()) | ||
.config(TaskConfig.validate(workflowTask.getConfig())) | ||
.taskType(workflowTask.getTaskType()) | ||
.state(TaskStateCode.BLOCKED) | ||
.stateFlags(TaskStateFlags.empty().withInitialTask()) | ||
.build(); | ||
|
||
id = store.addSubtask(attemptId, task); | ||
} else { | ||
TaskStateCode state; | ||
switch(archivedTask.getState()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 元々のTaskControl#addTasksでは、resumingTasksの有無によってBLOCKED or SUCCESSで分岐していたが、6aa7cb9 によって「resumeで取得するtaskがSUCCESSのみ」という前提が変わったため、stateによる分岐が必要になった。 |
||
case SUCCESS: | ||
state = TaskStateCode.SUCCESS; | ||
break; | ||
case ERROR: | ||
case CANCELED: | ||
state = TaskStateCode.BLOCKED; | ||
break; | ||
default: | ||
state = TaskStateCode.PLANNED; | ||
break; | ||
} | ||
|
||
ResumingTask resumingTask = ImmutableResumingTask.builder() | ||
.sourceTaskId(archivedTask.getId()) | ||
.fullName(archivedTask.getFullName()) | ||
.config(TaskConfig.validate(workflowTask.getConfig())) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ここのconfigのみarchivedTaskではなく、workflowTaskのconfigをINSERTしている。 具体的には、この修正がないと以下のIntegration Testでのretryで落ちる。 +step1:
sh>: touch ${outdir}/1-1.out
+step2:
+a:
sh>: touch ${outdir}/1-2a.out
+b:
fail>: step2b fail +step1:
sh>: touch ${outdir}/2-1.out
+step2:
+a:
sh>: touch ${outdir}/2-2a.out
+b:
sh>: touch ${outdir}/2-2b.out |
||
.updatedAt(archivedTask.getUpdatedAt()) | ||
.subtaskConfig(archivedTask.getSubtaskConfig()) | ||
.exportParams(archivedTask.getExportParams()) | ||
.resetStoreParams(archivedTask.getResetStoreParams()) | ||
.storeParams(archivedTask.getStoreParams()) | ||
.report(archivedTask.getReport().or(TaskReport.empty())) | ||
.error(archivedTask.getError()) | ||
.build(); | ||
|
||
id = store.addResumedSubtask(attemptId, | ||
parentId, | ||
workflowTask.getTaskType(), | ||
state, | ||
TaskStateFlags.empty().withInitialTask(), | ||
resumingTask); | ||
} | ||
|
||
indexToId.add(id); | ||
|
||
if (!workflowTask.getUpstreamIndexes().isEmpty()) { | ||
store.addDependencies( | ||
id, | ||
workflowTask.getUpstreamIndexes() | ||
.stream() | ||
.map(index -> indexToId.get(index)) | ||
.collect(Collectors.toList()) | ||
); | ||
} | ||
}); | ||
|
||
Map<String, Long> taskNameAndIds = workflowTasks.stream() | ||
.collect(Collectors.toMap(WorkflowTask::getFullName, task -> indexToId.get(workflowTasks.indexOf(task)))); | ||
|
||
archivedTasks.stream() | ||
.filter(archivedTask -> !taskNameAndIds.keySet().contains(archivedTask.getFullName()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. すでにINSERT済みのtaskは除外する。 |
||
&& archivedTask.getFullName().contains("^sub")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 単にtask名に「^」を含む場合だと、「^failure-alert」や「^error」などの除外すべきsubtaskも含まれるため、狭義のsubtaskに限定する必要がある。 |
||
.sorted(Comparator.comparingInt((t) -> (int) t.getId())) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 後続のtask_dependenciesテーブルへのINESRTのために、upstreamのtaskを先にtasksテーブルへのINSERTしてIDを採番する必要があるため、あらかじめretry failed前のattemptのTask IDでソートする。 |
||
.forEach(archivedSubtask -> { | ||
String parentTaskName = archivedSubtask.getFullName().replaceAll("(\\^sub|\\+)[^\\^+]*$", ""); | ||
Long parentId = taskNameAndIds.get(parentTaskName); | ||
|
||
TaskStateCode state; | ||
switch(archivedSubtask.getState()) { | ||
case SUCCESS: | ||
state = TaskStateCode.SUCCESS; | ||
break; | ||
case ERROR: | ||
case CANCELED: | ||
state = TaskStateCode.BLOCKED; | ||
break; | ||
default: | ||
state = TaskStateCode.PLANNED; | ||
break; | ||
} | ||
|
||
Long id = store.addResumedSubtask(attemptId, | ||
parentId, | ||
archivedSubtask.getTaskType(), | ||
state, | ||
TaskStateFlags.empty().withInitialTask(), | ||
ResumingTask.of(archivedSubtask)); | ||
|
||
taskNameAndIds.put(archivedSubtask.getFullName(), id); | ||
|
||
archivedTasks.stream() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TaskControl#addTasksと同じようにtask_dependenciesテーブルへのINSERTが必要なため、task名からupstreamのIDを取得しINSERTする。 |
||
.filter(t -> t.getId() == archivedSubtask.getId() - 1) | ||
.findFirst() | ||
.ifPresent((upstreamArchivedTask) -> { | ||
List<Long> upstreamIds = new ArrayList<>(); | ||
Long upstreamId = taskNameAndIds.get(upstreamArchivedTask.getFullName()); | ||
upstreamIds.add(upstreamId); | ||
store.addDependencies(id, upstreamIds); | ||
});; | ||
}); | ||
|
||
return rootTaskId; | ||
} | ||
|
||
private static void addResumingTasks(TaskControlStore store, long attemptId, List<ResumingTask> resumingTasks) | ||
{ | ||
// store only dynamically-generated tasks | ||
|
@@ -207,28 +336,24 @@ private List<ResumingTask> collectResumingTasks(long attemptId, WorkflowTaskList | |
return store.getResumingTasksByNamePrefix(attemptId, commonPrefix); | ||
} | ||
|
||
static List<ResumingTask> buildResumingTaskMap(SessionStore store, long attemptId, List<Long> resumingTaskIds) | ||
static List<ArchivedTask> buildResumingTaskMap(SessionStore store, long attemptId, List<Long> resumingTaskIds) | ||
throws ResourceNotFoundException | ||
{ | ||
Set<Long> idSet = new HashSet<>(resumingTaskIds); | ||
List<ResumingTask> resumingTasks = store | ||
List<ArchivedTask> archivedTasks = store | ||
.getTasksOfAttempt(attemptId) | ||
.stream() | ||
.filter(archived -> { | ||
if (idSet.remove(archived.getId())) { | ||
if (archived.getState() != TaskStateCode.SUCCESS) { | ||
throw new IllegalResumeException("Resuming non-successful tasks is not allowed: task_id=" + archived.getId()); | ||
} | ||
return true; | ||
} | ||
return false; | ||
}) | ||
.map(archived -> ResumingTask.of(archived)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ResumingTaskに変換せず、ArchivedTaskのまま返すように。 |
||
.collect(Collectors.toList()); | ||
if (!idSet.isEmpty()) { | ||
throw new ResourceNotFoundException("Resuming tasks are not the members of resuming attempt: id list=" + idSet); | ||
} | ||
return resumingTasks; | ||
return archivedTasks; | ||
} | ||
|
||
//// | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
|
||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.Arrays; | ||
import java.util.HashSet; | ||
import java.util.stream.Collectors; | ||
import javax.ws.rs.Consumes; | ||
|
@@ -298,21 +299,24 @@ private List<Long> collectResumingTasks(RestSessionAttemptRequest.Resume resume) | |
|
||
private List<Long> collectResumingTasksForResumeFailedMode(long attemptId) | ||
{ | ||
TaskStateCode[] statusArr = {TaskStateCode.SUCCESS, TaskStateCode.GROUP_ERROR, TaskStateCode.ERROR, TaskStateCode.CANCELED}; | ||
List<TaskStateCode> statuses = Arrays.asList(statusArr); | ||
|
||
List<ArchivedTask> tasks = sm | ||
.getSessionStore(getSiteId()) | ||
.getTasksOfAttempt(attemptId); | ||
|
||
List<Long> successTasks = tasks.stream() | ||
.filter(task -> task.getState() == TaskStateCode.SUCCESS) | ||
List<Long> ids = tasks.stream() | ||
.filter(t-> statuses.contains(t.getState())) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. resumeするtaskをSUCCESSのみから変更。 |
||
.map(task -> { | ||
if (!task.getParentId().isPresent()) { | ||
if (!task.getParentId().isPresent() && task.getState() == TaskStateCode.SUCCESS) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. すでに成功したattemptのresumeは禁止されているため、L310のfilterの変更に合わせて修正。 |
||
throw new IllegalArgumentException("Resuming successfully completed attempts is not supported"); | ||
} | ||
return task.getId(); | ||
}) | ||
.collect(Collectors.toList()); | ||
|
||
return ImmutableList.copyOf(successTasks); | ||
return ImmutableList.copyOf(ids); | ||
} | ||
|
||
private List<Long> collectResumingTasksForResumeFromMode(long attemptId, String fromTaskPattern) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a463168
(#90) のaddInitialTasksに変更。