Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
snagasawa committed Feb 14, 2024
1 parent 2fa69d3 commit 584db23
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 48 deletions.
151 changes: 121 additions & 30 deletions digdag-core/src/main/java/io/digdag/core/workflow/TaskControl.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
import java.util.Set;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.stream.Collectors;

import javax.annotation.Nonnull;

import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.collect.*;
Expand All @@ -21,6 +24,7 @@
import io.digdag.core.session.TaskControlStore;
import io.digdag.core.session.TaskStateCode;
import io.digdag.core.session.TaskStateFlags;
import io.digdag.core.session.TaskType;
import io.digdag.spi.TaskResult;
import io.digdag.client.config.Config;

Expand Down Expand Up @@ -56,14 +60,12 @@ public TaskStateCode getState()

public static long addInitialTasksExceptingRootTask(
TaskControlStore store, long attemptId, long rootTaskId,
WorkflowTaskList tasks, List<ResumingTask> resumingTasks, Limits limits)
WorkflowTaskList tasks, List<ArchivedTask> archivedTasks, Limits limits)
throws TaskLimitExceededException
{
checkTaskLimit(store, attemptId, tasks, limits);
long taskId = addTasks(store, attemptId, rootTaskId,
tasks, ImmutableList.of(),
false, true, true,
resumingTasks);
long taskId = addInitialTasks(store, attemptId, rootTaskId, tasks, ImmutableList.of(), archivedTasks);
List<ResumingTask> resumingTasks = archivedTasks.stream().map(ResumingTask::of).collect(Collectors.toList());
addResumingTasks(store, attemptId, resumingTasks);
return taskId;
}
Expand Down Expand Up @@ -183,27 +185,120 @@ private static long addTasks(TaskControlStore store,
firstTask = false;
}

Map<String, Long> taskNameAndIds = tasks.stream()
.collect(Collectors.toMap(
WorkflowTask::getFullName,
task -> indexToId.get(tasks.indexOf(task))
));
return rootTaskId;
}

resumingTasks
.stream()
.filter(resumingTask -> !taskNameAndIds.keySet().contains(resumingTask.getFullName())
&& resumingTask.getFullName().endsWith("^sub"))
.forEach(resumingSubtask -> {
String parentTaskName = resumingSubtask.getFullName().replaceAll("\\^sub$", "");

store.addResumedSubtask(attemptId,
taskNameAndIds.get(parentTaskName),
resumingSubtask.getTaskType(),
TaskStateCode.SUCCESS,
(isInitialTask ? TaskStateFlags.empty().withInitialTask() : TaskStateFlags.empty()),
resumingSubtask);
private static long addInitialTasks(TaskControlStore store, long attemptId, long rootTaskId,
WorkflowTaskList workflowTasks, List<Long> rootUpstreamIds, List<ArchivedTask> archivedTasks)
{
List<Long> indexToId = new ArrayList<>();
indexToId.add(rootTaskId);

workflowTasks.stream()
.skip(1) // skip the root task
.forEach(workflowTask -> {
long id;
long parentId = workflowTask.getParentIndex()
.transform(index -> indexToId.get(index))
.or(rootTaskId);

ArchivedTask archivedTask = archivedTasks.stream()
.filter(t -> t.getFullName().equals(workflowTask.getFullName()))
.findFirst()
.orElse(null);

if (archivedTask == null) {
Task task = Task.taskBuilder()
.parentId(Optional.of(parentId))
.fullName(workflowTask.getFullName())
.config(TaskConfig.validate(workflowTask.getConfig()))
.taskType(workflowTask.getTaskType())
.state(TaskStateCode.BLOCKED)
.stateFlags(TaskStateFlags.empty().withInitialTask())
.build();

id = store.addSubtask(attemptId, task);
} else {
TaskStateCode state;
switch(archivedTask.getState()) {
case SUCCESS:
state = TaskStateCode.SUCCESS;
break;
case ERROR:
case CANCELED:
state = TaskStateCode.BLOCKED;
break;
default:
state = TaskStateCode.PLANNED;
break;
}

id = store.addResumedSubtask(attemptId,
parentId,
workflowTask.getTaskType(),
state,
TaskStateFlags.empty().withInitialTask(),
ResumingTask.of(archivedTask));
}

indexToId.add(id);

if (!workflowTask.getUpstreamIndexes().isEmpty()) {
store.addDependencies(
id,
workflowTask.getUpstreamIndexes()
.stream()
.map(index -> indexToId.get(index))
.collect(Collectors.toList())
);
}
});

Map<String, Long> taskNameAndIds = workflowTasks.stream()
.collect(Collectors.toMap(WorkflowTask::getFullName, task -> indexToId.get(workflowTasks.indexOf(task))));

archivedTasks.stream()
.filter(archivedTask -> !taskNameAndIds.keySet().contains(archivedTask.getFullName())
&& archivedTask.getFullName().contains("^sub"))
.sorted(Comparator.comparingInt((t) -> (int) t.getId()))
.forEach(archivedSubtask -> {
String parentTaskName = archivedSubtask.getFullName().replaceAll("(\\^sub|\\+)[^\\^+]*$", "");
Long parentId = taskNameAndIds.get(parentTaskName);

TaskStateCode state;
switch(archivedSubtask.getState()) {
case SUCCESS:
state = TaskStateCode.SUCCESS;
break;
case ERROR:
case CANCELED:
state = TaskStateCode.BLOCKED;
break;
default:
state = TaskStateCode.PLANNED;
break;
}

Long id = store.addResumedSubtask(attemptId,
parentId,
archivedSubtask.getTaskType(),
state,
TaskStateFlags.empty().withInitialTask(),
ResumingTask.of(archivedSubtask));

taskNameAndIds.put(archivedSubtask.getFullName(), id);

archivedTasks.stream()
.filter(t -> t.getId() == archivedSubtask.getId() - 1)
.findFirst()
.ifPresent((upstreamArchivedTask) -> {
List<Long> upstreamIds = new ArrayList<>();
Long upstreamId = taskNameAndIds.get(upstreamArchivedTask.getFullName());
upstreamIds.add(upstreamId);
store.addDependencies(id, upstreamIds);
});;
});

return rootTaskId;
}

Expand All @@ -229,28 +324,24 @@ private List<ResumingTask> collectResumingTasks(long attemptId, WorkflowTaskList
return store.getResumingTasksByNamePrefix(attemptId, commonPrefix);
}

static List<ResumingTask> buildResumingTaskMap(SessionStore store, long attemptId, List<Long> resumingTaskIds)
static List<ArchivedTask> buildResumingTaskMap(SessionStore store, long attemptId, List<Long> resumingTaskIds)
throws ResourceNotFoundException
{
Set<Long> idSet = new HashSet<>(resumingTaskIds);
List<ResumingTask> resumingTasks = store
List<ArchivedTask> archivedTasks = store
.getTasksOfAttempt(attemptId)
.stream()
.filter(archived -> {
if (idSet.remove(archived.getId())) {
if (archived.getState() != TaskStateCode.SUCCESS) {
throw new IllegalResumeException("Resuming non-successful tasks is not allowed: task_id=" + archived.getId());
}
return true;
}
return false;
})
.map(archived -> ResumingTask.of(archived))
.collect(Collectors.toList());
if (!idSet.isEmpty()) {
throw new ResourceNotFoundException("Resuming tasks are not the members of resuming attempt: id list=" + idSet);
}
return resumingTasks;
return archivedTasks;
}

////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.digdag.core.repository.StoredRevision;
import io.digdag.core.repository.WorkflowDefinition;
import io.digdag.core.session.ResumingTask;
import io.digdag.core.session.ArchivedTask;
import io.digdag.core.session.ParameterUpdate;
import io.digdag.core.session.Session;
import io.digdag.core.session.SessionAttempt;
Expand All @@ -43,6 +44,8 @@
import io.digdag.spi.TaskConflictException;
import io.digdag.spi.TaskNotFoundException;
import io.digdag.spi.metrics.DigdagMetrics;
import io.digdag.spi.metrics.DigdagMetrics.Category;

import static io.digdag.spi.metrics.DigdagMetrics.Category;
import io.digdag.util.RetryControl;
import org.slf4j.Logger;
Expand Down Expand Up @@ -269,21 +272,15 @@ public StoredSessionAttemptWithSession submitTasks(int siteId, AttemptRequest ar

TaskConfig.validateAttempt(attempt);

List<ResumingTask> resumingTasks;
List<ArchivedTask> archivedTasks;
if (ar.getResumingAttemptId().isPresent()) {
WorkflowTask root = tasks.get(0);
resumingTasks = TaskControl.buildResumingTaskMap(
archivedTasks = TaskControl.buildResumingTaskMap(
sm.getSessionStore(siteId),
ar.getResumingAttemptId().get(),
ar.getResumingTasks());
for (ResumingTask resumingTask : resumingTasks) {
if (resumingTask.getFullName().equals(root.getFullName())) {
throw new IllegalResumeException("Resuming root task is not allowed");
}
}
}
else {
resumingTasks = ImmutableList.of();
archivedTasks = ImmutableList.of();
}

StoredSessionAttemptWithSession stored;
Expand Down Expand Up @@ -313,7 +310,7 @@ public StoredSessionAttemptWithSession submitTasks(int siteId, AttemptRequest ar
StoredSessionAttemptWithSession.of(siteId, storedSession, storedAttempt);

try {
storeTasks(store, storedAttemptWithSession, tasks, resumingTasks, ar.getSessionMonitors());
storeTasks(store, storedAttemptWithSession, tasks, archivedTasks, ar.getSessionMonitors());
}
catch (TaskLimitExceededException ex) {
throw new WorkflowTaskLimitExceededException(ex);
Expand Down Expand Up @@ -348,21 +345,21 @@ public void storeTasks(
SessionControlStore store,
StoredSessionAttemptWithSession storedAttempt,
WorkflowDefinition def,
List<ResumingTask> resumingTasks,
List<ArchivedTask> archivedTasks,
List<SessionMonitor> sessionMonitors)
throws TaskLimitExceededException
{
Workflow workflow = compiler.compile(def.getName(), def.getConfig());
WorkflowTaskList tasks = workflow.getTasks();

storeTasks(store, storedAttempt, tasks, resumingTasks, sessionMonitors);
storeTasks(store, storedAttempt, tasks, archivedTasks, sessionMonitors);
}

public void storeTasks(
SessionControlStore store,
StoredSessionAttemptWithSession storedAttempt,
WorkflowTaskList tasks,
List<ResumingTask> resumingTasks,
List<ArchivedTask> archivedTasks,
List<SessionMonitor> sessionMonitors)
throws TaskLimitExceededException
{
Expand All @@ -386,7 +383,7 @@ public void storeTasks(
store.insertRootTask(storedAttempt.getId(), rootTask, (taskStore, storedTaskId) -> {
try {
TaskControl.addInitialTasksExceptingRootTask(taskStore, storedAttempt.getId(),
storedTaskId, tasks, resumingTasks, limits);
storedTaskId, tasks, archivedTasks, limits);
}
catch (TaskLimitExceededException ex) {
throw new WorkflowTaskLimitExceededException(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.List;
import java.util.Set;
import java.util.Arrays;
import java.util.HashSet;
import java.util.stream.Collectors;
import javax.ws.rs.Consumes;
Expand Down Expand Up @@ -298,21 +299,24 @@ private List<Long> collectResumingTasks(RestSessionAttemptRequest.Resume resume)

private List<Long> collectResumingTasksForResumeFailedMode(long attemptId)
{
TaskStateCode[] statusArr = {TaskStateCode.SUCCESS, TaskStateCode.GROUP_ERROR, TaskStateCode.ERROR, TaskStateCode.CANCELED};
List<TaskStateCode> statuses = Arrays.asList(statusArr);

List<ArchivedTask> tasks = sm
.getSessionStore(getSiteId())
.getTasksOfAttempt(attemptId);

List<Long> successTasks = tasks.stream()
.filter(task -> task.getState() == TaskStateCode.SUCCESS)
List<Long> ids = tasks.stream()
.filter(t-> statuses.contains(t.getState()))
.map(task -> {
if (!task.getParentId().isPresent()) {
if (!task.getParentId().isPresent() && task.getState() == TaskStateCode.SUCCESS) {
throw new IllegalArgumentException("Resuming successfully completed attempts is not supported");
}
return task.getId();
})
.collect(Collectors.toList());

return ImmutableList.copyOf(successTasks);
return ImmutableList.copyOf(ids);
}

private List<Long> collectResumingTasksForResumeFromMode(long attemptId, String fromTaskPattern)
Expand Down

0 comments on commit 584db23

Please sign in to comment.