Skip to content

Commit

Permalink
change ResumingTasks to ArchivedTasks
Browse files Browse the repository at this point in the history
  • Loading branch information
snagasawa committed Feb 15, 2024
1 parent fdf7462 commit 6f60f54
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 26 deletions.
19 changes: 7 additions & 12 deletions digdag-core/src/main/java/io/digdag/core/workflow/TaskControl.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.stream.Collectors;

import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.collect.*;
Expand Down Expand Up @@ -56,14 +57,12 @@ public TaskStateCode getState()

public static long addInitialTasksExceptingRootTask(
TaskControlStore store, long attemptId, long rootTaskId,
WorkflowTaskList tasks, List<ResumingTask> resumingTasks, Limits limits)
WorkflowTaskList tasks, List<ArchivedTask> archivedTasks, Limits limits)
throws TaskLimitExceededException
{
checkTaskLimit(store, attemptId, tasks, limits);
long taskId = addTasks(store, attemptId, rootTaskId,
tasks, ImmutableList.of(),
false, true, true,
resumingTasks);
long taskId = addInitialTasks(store, attemptId, rootTaskId, tasks, archivedTasks);
List<ResumingTask> resumingTasks = archivedTasks.stream().map(ResumingTask::of).collect(Collectors.toList());
addResumingTasks(store, attemptId, resumingTasks);
return taskId;
}
Expand Down Expand Up @@ -322,28 +321,24 @@ private List<ResumingTask> collectResumingTasks(long attemptId, WorkflowTaskList
return store.getResumingTasksByNamePrefix(attemptId, commonPrefix);
}

static List<ResumingTask> buildResumingTaskMap(SessionStore store, long attemptId, List<Long> resumingTaskIds)
static List<ArchivedTask> buildResumingTaskMap(SessionStore store, long attemptId, List<Long> resumingTaskIds)
throws ResourceNotFoundException
{
Set<Long> idSet = new HashSet<>(resumingTaskIds);
List<ResumingTask> resumingTasks = store
List<ArchivedTask> archivedTasks = store
.getTasksOfAttempt(attemptId)
.stream()
.filter(archived -> {
if (idSet.remove(archived.getId())) {
if (archived.getState() != TaskStateCode.SUCCESS) {
throw new IllegalResumeException("Resuming non-successful tasks is not allowed: task_id=" + archived.getId());
}
return true;
}
return false;
})
.map(archived -> ResumingTask.of(archived))
.collect(Collectors.toList());
if (!idSet.isEmpty()) {
throw new ResourceNotFoundException("Resuming tasks are not the members of resuming attempt: id list=" + idSet);
}
return resumingTasks;
return archivedTasks;
}

////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.digdag.core.repository.StoredRevision;
import io.digdag.core.repository.WorkflowDefinition;
import io.digdag.core.session.ResumingTask;
import io.digdag.core.session.ArchivedTask;
import io.digdag.core.session.ParameterUpdate;
import io.digdag.core.session.Session;
import io.digdag.core.session.SessionAttempt;
Expand All @@ -43,6 +44,8 @@
import io.digdag.spi.TaskConflictException;
import io.digdag.spi.TaskNotFoundException;
import io.digdag.spi.metrics.DigdagMetrics;
import io.digdag.spi.metrics.DigdagMetrics.Category;

import static io.digdag.spi.metrics.DigdagMetrics.Category;
import io.digdag.util.RetryControl;
import org.slf4j.Logger;
Expand Down Expand Up @@ -269,21 +272,15 @@ public StoredSessionAttemptWithSession submitTasks(int siteId, AttemptRequest ar

TaskConfig.validateAttempt(attempt);

List<ResumingTask> resumingTasks;
List<ArchivedTask> archivedTasks;
if (ar.getResumingAttemptId().isPresent()) {
WorkflowTask root = tasks.get(0);
resumingTasks = TaskControl.buildResumingTaskMap(
archivedTasks = TaskControl.buildResumingTaskMap(
sm.getSessionStore(siteId),
ar.getResumingAttemptId().get(),
ar.getResumingTasks());
for (ResumingTask resumingTask : resumingTasks) {
if (resumingTask.getFullName().equals(root.getFullName())) {
throw new IllegalResumeException("Resuming root task is not allowed");
}
}
}
else {
resumingTasks = ImmutableList.of();
archivedTasks = ImmutableList.of();
}

StoredSessionAttemptWithSession stored;
Expand Down Expand Up @@ -313,7 +310,7 @@ public StoredSessionAttemptWithSession submitTasks(int siteId, AttemptRequest ar
StoredSessionAttemptWithSession.of(siteId, storedSession, storedAttempt);

try {
storeTasks(store, storedAttemptWithSession, tasks, resumingTasks, ar.getSessionMonitors());
storeTasks(store, storedAttemptWithSession, tasks, archivedTasks, ar.getSessionMonitors());
}
catch (TaskLimitExceededException ex) {
throw new WorkflowTaskLimitExceededException(ex);
Expand Down Expand Up @@ -348,21 +345,21 @@ public void storeTasks(
SessionControlStore store,
StoredSessionAttemptWithSession storedAttempt,
WorkflowDefinition def,
List<ResumingTask> resumingTasks,
List<ArchivedTask> archivedTasks,
List<SessionMonitor> sessionMonitors)
throws TaskLimitExceededException
{
Workflow workflow = compiler.compile(def.getName(), def.getConfig());
WorkflowTaskList tasks = workflow.getTasks();

storeTasks(store, storedAttempt, tasks, resumingTasks, sessionMonitors);
storeTasks(store, storedAttempt, tasks, archivedTasks, sessionMonitors);
}

public void storeTasks(
SessionControlStore store,
StoredSessionAttemptWithSession storedAttempt,
WorkflowTaskList tasks,
List<ResumingTask> resumingTasks,
List<ArchivedTask> archivedTasks,
List<SessionMonitor> sessionMonitors)
throws TaskLimitExceededException
{
Expand All @@ -386,7 +383,7 @@ public void storeTasks(
store.insertRootTask(storedAttempt.getId(), rootTask, (taskStore, storedTaskId) -> {
try {
TaskControl.addInitialTasksExceptingRootTask(taskStore, storedAttempt.getId(),
storedTaskId, tasks, resumingTasks, limits);
storedTaskId, tasks, archivedTasks, limits);
}
catch (TaskLimitExceededException ex) {
throw new WorkflowTaskLimitExceededException(ex);
Expand Down

0 comments on commit 6f60f54

Please sign in to comment.