Skip to content

Commit

Permalink
resume subtasks
Browse files Browse the repository at this point in the history
  • Loading branch information
snagasawa committed Feb 13, 2024
1 parent 691d5ee commit 2fa69d3
Showing 1 changed file with 22 additions and 0 deletions.
22 changes: 22 additions & 0 deletions digdag-core/src/main/java/io/digdag/core/workflow/TaskControl.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.HashSet;
import java.util.ArrayList;
import java.util.stream.Collectors;

import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.collect.*;
Expand Down Expand Up @@ -182,6 +183,27 @@ private static long addTasks(TaskControlStore store,
firstTask = false;
}

Map<String, Long> taskNameAndIds = tasks.stream()
.collect(Collectors.toMap(
WorkflowTask::getFullName,
task -> indexToId.get(tasks.indexOf(task))
));

resumingTasks
.stream()
.filter(resumingTask -> !taskNameAndIds.keySet().contains(resumingTask.getFullName())
&& resumingTask.getFullName().endsWith("^sub"))
.forEach(resumingSubtask -> {
String parentTaskName = resumingSubtask.getFullName().replaceAll("\\^sub$", "");

store.addResumedSubtask(attemptId,
taskNameAndIds.get(parentTaskName),
resumingSubtask.getTaskType(),
TaskStateCode.SUCCESS,
(isInitialTask ? TaskStateFlags.empty().withInitialTask() : TaskStateFlags.empty()),
resumingSubtask);
});

return rootTaskId;
}

Expand Down

0 comments on commit 2fa69d3

Please sign in to comment.