From 584db23f5e9d165a37aa081c5f555ae393b37ada Mon Sep 17 00:00:00 2001 From: Shuhei Nagasawa Date: Mon, 25 Dec 2023 21:51:10 +0900 Subject: [PATCH] wip --- .../io/digdag/core/workflow/TaskControl.java | 151 ++++++++++++++---- .../core/workflow/WorkflowExecutor.java | 25 ++- .../io/digdag/server/rs/AttemptResource.java | 12 +- 3 files changed, 140 insertions(+), 48 deletions(-) diff --git a/digdag-core/src/main/java/io/digdag/core/workflow/TaskControl.java b/digdag-core/src/main/java/io/digdag/core/workflow/TaskControl.java index 827badf26b..6619b6dd5c 100644 --- a/digdag-core/src/main/java/io/digdag/core/workflow/TaskControl.java +++ b/digdag-core/src/main/java/io/digdag/core/workflow/TaskControl.java @@ -5,8 +5,11 @@ import java.util.Set; import java.util.HashSet; import java.util.ArrayList; +import java.util.Comparator; import java.util.stream.Collectors; +import javax.annotation.Nonnull; + import com.google.common.base.Optional; import com.google.common.base.Strings; import com.google.common.collect.*; @@ -21,6 +24,7 @@ import io.digdag.core.session.TaskControlStore; import io.digdag.core.session.TaskStateCode; import io.digdag.core.session.TaskStateFlags; +import io.digdag.core.session.TaskType; import io.digdag.spi.TaskResult; import io.digdag.client.config.Config; @@ -56,14 +60,12 @@ public TaskStateCode getState() public static long addInitialTasksExceptingRootTask( TaskControlStore store, long attemptId, long rootTaskId, - WorkflowTaskList tasks, List resumingTasks, Limits limits) + WorkflowTaskList tasks, List archivedTasks, Limits limits) throws TaskLimitExceededException { checkTaskLimit(store, attemptId, tasks, limits); - long taskId = addTasks(store, attemptId, rootTaskId, - tasks, ImmutableList.of(), - false, true, true, - resumingTasks); + long taskId = addInitialTasks(store, attemptId, rootTaskId, tasks, ImmutableList.of(), archivedTasks); + List resumingTasks = archivedTasks.stream().map(ResumingTask::of).collect(Collectors.toList()); addResumingTasks(store, attemptId, resumingTasks); return taskId; } @@ -183,27 +185,120 @@ private static long addTasks(TaskControlStore store, firstTask = false; } - Map taskNameAndIds = tasks.stream() - .collect(Collectors.toMap( - WorkflowTask::getFullName, - task -> indexToId.get(tasks.indexOf(task)) - )); + return rootTaskId; + } - resumingTasks - .stream() - .filter(resumingTask -> !taskNameAndIds.keySet().contains(resumingTask.getFullName()) - && resumingTask.getFullName().endsWith("^sub")) - .forEach(resumingSubtask -> { - String parentTaskName = resumingSubtask.getFullName().replaceAll("\\^sub$", ""); - - store.addResumedSubtask(attemptId, - taskNameAndIds.get(parentTaskName), - resumingSubtask.getTaskType(), - TaskStateCode.SUCCESS, - (isInitialTask ? TaskStateFlags.empty().withInitialTask() : TaskStateFlags.empty()), - resumingSubtask); + private static long addInitialTasks(TaskControlStore store, long attemptId, long rootTaskId, + WorkflowTaskList workflowTasks, List rootUpstreamIds, List archivedTasks) + { + List indexToId = new ArrayList<>(); + indexToId.add(rootTaskId); + + workflowTasks.stream() + .skip(1) // skip the root task + .forEach(workflowTask -> { + long id; + long parentId = workflowTask.getParentIndex() + .transform(index -> indexToId.get(index)) + .or(rootTaskId); + + ArchivedTask archivedTask = archivedTasks.stream() + .filter(t -> t.getFullName().equals(workflowTask.getFullName())) + .findFirst() + .orElse(null); + + if (archivedTask == null) { + Task task = Task.taskBuilder() + .parentId(Optional.of(parentId)) + .fullName(workflowTask.getFullName()) + .config(TaskConfig.validate(workflowTask.getConfig())) + .taskType(workflowTask.getTaskType()) + .state(TaskStateCode.BLOCKED) + .stateFlags(TaskStateFlags.empty().withInitialTask()) + .build(); + + id = store.addSubtask(attemptId, task); + } else { + TaskStateCode state; + switch(archivedTask.getState()) { + case SUCCESS: + state = TaskStateCode.SUCCESS; + break; + case ERROR: + case CANCELED: + state = TaskStateCode.BLOCKED; + break; + default: + state = TaskStateCode.PLANNED; + break; + } + + id = store.addResumedSubtask(attemptId, + parentId, + workflowTask.getTaskType(), + state, + TaskStateFlags.empty().withInitialTask(), + ResumingTask.of(archivedTask)); + } + + indexToId.add(id); + + if (!workflowTask.getUpstreamIndexes().isEmpty()) { + store.addDependencies( + id, + workflowTask.getUpstreamIndexes() + .stream() + .map(index -> indexToId.get(index)) + .collect(Collectors.toList()) + ); + } }); + Map taskNameAndIds = workflowTasks.stream() + .collect(Collectors.toMap(WorkflowTask::getFullName, task -> indexToId.get(workflowTasks.indexOf(task)))); + + archivedTasks.stream() + .filter(archivedTask -> !taskNameAndIds.keySet().contains(archivedTask.getFullName()) + && archivedTask.getFullName().contains("^sub")) + .sorted(Comparator.comparingInt((t) -> (int) t.getId())) + .forEach(archivedSubtask -> { + String parentTaskName = archivedSubtask.getFullName().replaceAll("(\\^sub|\\+)[^\\^+]*$", ""); + Long parentId = taskNameAndIds.get(parentTaskName); + + TaskStateCode state; + switch(archivedSubtask.getState()) { + case SUCCESS: + state = TaskStateCode.SUCCESS; + break; + case ERROR: + case CANCELED: + state = TaskStateCode.BLOCKED; + break; + default: + state = TaskStateCode.PLANNED; + break; + } + + Long id = store.addResumedSubtask(attemptId, + parentId, + archivedSubtask.getTaskType(), + state, + TaskStateFlags.empty().withInitialTask(), + ResumingTask.of(archivedSubtask)); + + taskNameAndIds.put(archivedSubtask.getFullName(), id); + + archivedTasks.stream() + .filter(t -> t.getId() == archivedSubtask.getId() - 1) + .findFirst() + .ifPresent((upstreamArchivedTask) -> { + List upstreamIds = new ArrayList<>(); + Long upstreamId = taskNameAndIds.get(upstreamArchivedTask.getFullName()); + upstreamIds.add(upstreamId); + store.addDependencies(id, upstreamIds); + });; + }); + return rootTaskId; } @@ -229,28 +324,24 @@ private List collectResumingTasks(long attemptId, WorkflowTaskList return store.getResumingTasksByNamePrefix(attemptId, commonPrefix); } - static List buildResumingTaskMap(SessionStore store, long attemptId, List resumingTaskIds) + static List buildResumingTaskMap(SessionStore store, long attemptId, List resumingTaskIds) throws ResourceNotFoundException { Set idSet = new HashSet<>(resumingTaskIds); - List resumingTasks = store + List archivedTasks = store .getTasksOfAttempt(attemptId) .stream() .filter(archived -> { if (idSet.remove(archived.getId())) { - if (archived.getState() != TaskStateCode.SUCCESS) { - throw new IllegalResumeException("Resuming non-successful tasks is not allowed: task_id=" + archived.getId()); - } return true; } return false; }) - .map(archived -> ResumingTask.of(archived)) .collect(Collectors.toList()); if (!idSet.isEmpty()) { throw new ResourceNotFoundException("Resuming tasks are not the members of resuming attempt: id list=" + idSet); } - return resumingTasks; + return archivedTasks; } //// diff --git a/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java b/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java index 4a03e906b4..fe16623a4f 100644 --- a/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java +++ b/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java @@ -21,6 +21,7 @@ import io.digdag.core.repository.StoredRevision; import io.digdag.core.repository.WorkflowDefinition; import io.digdag.core.session.ResumingTask; +import io.digdag.core.session.ArchivedTask; import io.digdag.core.session.ParameterUpdate; import io.digdag.core.session.Session; import io.digdag.core.session.SessionAttempt; @@ -43,6 +44,8 @@ import io.digdag.spi.TaskConflictException; import io.digdag.spi.TaskNotFoundException; import io.digdag.spi.metrics.DigdagMetrics; +import io.digdag.spi.metrics.DigdagMetrics.Category; + import static io.digdag.spi.metrics.DigdagMetrics.Category; import io.digdag.util.RetryControl; import org.slf4j.Logger; @@ -269,21 +272,15 @@ public StoredSessionAttemptWithSession submitTasks(int siteId, AttemptRequest ar TaskConfig.validateAttempt(attempt); - List resumingTasks; + List archivedTasks; if (ar.getResumingAttemptId().isPresent()) { - WorkflowTask root = tasks.get(0); - resumingTasks = TaskControl.buildResumingTaskMap( + archivedTasks = TaskControl.buildResumingTaskMap( sm.getSessionStore(siteId), ar.getResumingAttemptId().get(), ar.getResumingTasks()); - for (ResumingTask resumingTask : resumingTasks) { - if (resumingTask.getFullName().equals(root.getFullName())) { - throw new IllegalResumeException("Resuming root task is not allowed"); - } - } } else { - resumingTasks = ImmutableList.of(); + archivedTasks = ImmutableList.of(); } StoredSessionAttemptWithSession stored; @@ -313,7 +310,7 @@ public StoredSessionAttemptWithSession submitTasks(int siteId, AttemptRequest ar StoredSessionAttemptWithSession.of(siteId, storedSession, storedAttempt); try { - storeTasks(store, storedAttemptWithSession, tasks, resumingTasks, ar.getSessionMonitors()); + storeTasks(store, storedAttemptWithSession, tasks, archivedTasks, ar.getSessionMonitors()); } catch (TaskLimitExceededException ex) { throw new WorkflowTaskLimitExceededException(ex); @@ -348,21 +345,21 @@ public void storeTasks( SessionControlStore store, StoredSessionAttemptWithSession storedAttempt, WorkflowDefinition def, - List resumingTasks, + List archivedTasks, List sessionMonitors) throws TaskLimitExceededException { Workflow workflow = compiler.compile(def.getName(), def.getConfig()); WorkflowTaskList tasks = workflow.getTasks(); - storeTasks(store, storedAttempt, tasks, resumingTasks, sessionMonitors); + storeTasks(store, storedAttempt, tasks, archivedTasks, sessionMonitors); } public void storeTasks( SessionControlStore store, StoredSessionAttemptWithSession storedAttempt, WorkflowTaskList tasks, - List resumingTasks, + List archivedTasks, List sessionMonitors) throws TaskLimitExceededException { @@ -386,7 +383,7 @@ public void storeTasks( store.insertRootTask(storedAttempt.getId(), rootTask, (taskStore, storedTaskId) -> { try { TaskControl.addInitialTasksExceptingRootTask(taskStore, storedAttempt.getId(), - storedTaskId, tasks, resumingTasks, limits); + storedTaskId, tasks, archivedTasks, limits); } catch (TaskLimitExceededException ex) { throw new WorkflowTaskLimitExceededException(ex); diff --git a/digdag-server/src/main/java/io/digdag/server/rs/AttemptResource.java b/digdag-server/src/main/java/io/digdag/server/rs/AttemptResource.java index a678e3e690..c739bf52ae 100644 --- a/digdag-server/src/main/java/io/digdag/server/rs/AttemptResource.java +++ b/digdag-server/src/main/java/io/digdag/server/rs/AttemptResource.java @@ -2,6 +2,7 @@ import java.util.List; import java.util.Set; +import java.util.Arrays; import java.util.HashSet; import java.util.stream.Collectors; import javax.ws.rs.Consumes; @@ -298,21 +299,24 @@ private List collectResumingTasks(RestSessionAttemptRequest.Resume resume) private List collectResumingTasksForResumeFailedMode(long attemptId) { + TaskStateCode[] statusArr = {TaskStateCode.SUCCESS, TaskStateCode.GROUP_ERROR, TaskStateCode.ERROR, TaskStateCode.CANCELED}; + List statuses = Arrays.asList(statusArr); + List tasks = sm .getSessionStore(getSiteId()) .getTasksOfAttempt(attemptId); - List successTasks = tasks.stream() - .filter(task -> task.getState() == TaskStateCode.SUCCESS) + List ids = tasks.stream() + .filter(t-> statuses.contains(t.getState())) .map(task -> { - if (!task.getParentId().isPresent()) { + if (!task.getParentId().isPresent() && task.getState() == TaskStateCode.SUCCESS) { throw new IllegalArgumentException("Resuming successfully completed attempts is not supported"); } return task.getId(); }) .collect(Collectors.toList()); - return ImmutableList.copyOf(successTasks); + return ImmutableList.copyOf(ids); } private List collectResumingTasksForResumeFromMode(long attemptId, String fromTaskPattern)