From a4631682fd250346202eb3c2e56ee105670958c8 Mon Sep 17 00:00:00 2001 From: Shuhei Nagasawa Date: Thu, 15 Feb 2024 12:14:47 +0900 Subject: [PATCH 1/4] add addInitialTasks --- .../io/digdag/core/workflow/TaskControl.java | 130 ++++++++++++++++++ 1 file changed, 130 insertions(+) diff --git a/digdag-core/src/main/java/io/digdag/core/workflow/TaskControl.java b/digdag-core/src/main/java/io/digdag/core/workflow/TaskControl.java index dc9082c841..370cfadd13 100644 --- a/digdag-core/src/main/java/io/digdag/core/workflow/TaskControl.java +++ b/digdag-core/src/main/java/io/digdag/core/workflow/TaskControl.java @@ -5,6 +5,7 @@ import java.util.Set; import java.util.HashSet; import java.util.ArrayList; +import java.util.Comparator; import java.util.stream.Collectors; import com.google.common.base.Optional; import com.google.common.base.Strings; @@ -15,11 +16,13 @@ import io.digdag.core.session.StoredTask; import io.digdag.core.session.SessionStore; import io.digdag.core.session.ArchivedTask; +import io.digdag.core.session.ImmutableResumingTask; import io.digdag.core.session.ResumingTask; import io.digdag.core.session.Task; import io.digdag.core.session.TaskControlStore; import io.digdag.core.session.TaskStateCode; import io.digdag.core.session.TaskStateFlags; +import io.digdag.spi.TaskReport; import io.digdag.spi.TaskResult; import io.digdag.client.config.Config; @@ -185,6 +188,133 @@ private static long addTasks(TaskControlStore store, return rootTaskId; } + private static long addInitialTasks(TaskControlStore store, long attemptId, long rootTaskId, + WorkflowTaskList workflowTasks, List archivedTasks) + { + List indexToId = new ArrayList<>(); + indexToId.add(rootTaskId); + + workflowTasks.stream() + .skip(1) // skip the root task + .forEach(workflowTask -> { + long id; + long parentId = workflowTask.getParentIndex() + .transform(index -> indexToId.get(index)) + .or(rootTaskId); + + ArchivedTask archivedTask = archivedTasks.stream() + .filter(t -> t.getFullName().equals(workflowTask.getFullName())) + .findFirst() + .orElse(null); + + if (archivedTask == null) { + Task task = Task.taskBuilder() + .parentId(Optional.of(parentId)) + .fullName(workflowTask.getFullName()) + .config(TaskConfig.validate(workflowTask.getConfig())) + .taskType(workflowTask.getTaskType()) + .state(TaskStateCode.BLOCKED) + .stateFlags(TaskStateFlags.empty().withInitialTask()) + .build(); + + id = store.addSubtask(attemptId, task); + } else { + TaskStateCode state; + switch(archivedTask.getState()) { + case SUCCESS: + state = TaskStateCode.SUCCESS; + break; + case ERROR: + case CANCELED: + state = TaskStateCode.BLOCKED; + break; + default: + state = TaskStateCode.PLANNED; + break; + } + + ResumingTask resumingTask = ImmutableResumingTask.builder() + .sourceTaskId(archivedTask.getId()) + .fullName(archivedTask.getFullName()) + .config(TaskConfig.validate(workflowTask.getConfig())) + .updatedAt(archivedTask.getUpdatedAt()) + .subtaskConfig(archivedTask.getSubtaskConfig()) + .exportParams(archivedTask.getExportParams()) + .resetStoreParams(archivedTask.getResetStoreParams()) + .storeParams(archivedTask.getStoreParams()) + .report(archivedTask.getReport().or(TaskReport.empty())) + .error(archivedTask.getError()) + .build(); + + id = store.addResumedSubtask(attemptId, + parentId, + workflowTask.getTaskType(), + state, + TaskStateFlags.empty().withInitialTask(), + resumingTask); + } + + indexToId.add(id); + + if (!workflowTask.getUpstreamIndexes().isEmpty()) { + store.addDependencies( + id, + workflowTask.getUpstreamIndexes() + .stream() + .map(index -> indexToId.get(index)) + .collect(Collectors.toList()) + ); + } + }); + + Map taskNameAndIds = workflowTasks.stream() + .collect(Collectors.toMap(WorkflowTask::getFullName, task -> indexToId.get(workflowTasks.indexOf(task)))); + + archivedTasks.stream() + .filter(archivedTask -> !taskNameAndIds.keySet().contains(archivedTask.getFullName()) + && archivedTask.getFullName().contains("^sub")) + .sorted(Comparator.comparingInt((t) -> (int) t.getId())) + .forEach(archivedSubtask -> { + String parentTaskName = archivedSubtask.getFullName().replaceAll("(\\^sub|\\+)[^\\^+]*$", ""); + Long parentId = taskNameAndIds.get(parentTaskName); + + TaskStateCode state; + switch(archivedSubtask.getState()) { + case SUCCESS: + state = TaskStateCode.SUCCESS; + break; + case ERROR: + case CANCELED: + state = TaskStateCode.BLOCKED; + break; + default: + state = TaskStateCode.PLANNED; + break; + } + + Long id = store.addResumedSubtask(attemptId, + parentId, + archivedSubtask.getTaskType(), + state, + TaskStateFlags.empty().withInitialTask(), + ResumingTask.of(archivedSubtask)); + + taskNameAndIds.put(archivedSubtask.getFullName(), id); + + archivedTasks.stream() + .filter(t -> t.getId() == archivedSubtask.getId() - 1) + .findFirst() + .ifPresent((upstreamArchivedTask) -> { + List upstreamIds = new ArrayList<>(); + Long upstreamId = taskNameAndIds.get(upstreamArchivedTask.getFullName()); + upstreamIds.add(upstreamId); + store.addDependencies(id, upstreamIds); + });; + }); + + return rootTaskId; + } + private static void addResumingTasks(TaskControlStore store, long attemptId, List resumingTasks) { // store only dynamically-generated tasks From 4cb21c96c73b03f1d7e077f317faa54385bfc765 Mon Sep 17 00:00:00 2001 From: Shuhei Nagasawa Date: Thu, 15 Feb 2024 15:01:59 +0900 Subject: [PATCH 2/4] change ResumingTasks to ArchivedTasks --- .../io/digdag/core/workflow/TaskControl.java | 19 ++++++-------- .../core/workflow/WorkflowExecutor.java | 25 ++++++++----------- 2 files changed, 18 insertions(+), 26 deletions(-) diff --git a/digdag-core/src/main/java/io/digdag/core/workflow/TaskControl.java b/digdag-core/src/main/java/io/digdag/core/workflow/TaskControl.java index 370cfadd13..f3fab46c8d 100644 --- a/digdag-core/src/main/java/io/digdag/core/workflow/TaskControl.java +++ b/digdag-core/src/main/java/io/digdag/core/workflow/TaskControl.java @@ -7,6 +7,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.stream.Collectors; + import com.google.common.base.Optional; import com.google.common.base.Strings; import com.google.common.collect.*; @@ -58,14 +59,12 @@ public TaskStateCode getState() public static long addInitialTasksExceptingRootTask( TaskControlStore store, long attemptId, long rootTaskId, - WorkflowTaskList tasks, List resumingTasks, Limits limits) + WorkflowTaskList tasks, List archivedTasks, Limits limits) throws TaskLimitExceededException { checkTaskLimit(store, attemptId, tasks, limits); - long taskId = addTasks(store, attemptId, rootTaskId, - tasks, ImmutableList.of(), - false, true, true, - resumingTasks); + long taskId = addInitialTasks(store, attemptId, rootTaskId, tasks, archivedTasks); + List resumingTasks = archivedTasks.stream().map(ResumingTask::of).collect(Collectors.toList()); addResumingTasks(store, attemptId, resumingTasks); return taskId; } @@ -337,28 +336,24 @@ private List collectResumingTasks(long attemptId, WorkflowTaskList return store.getResumingTasksByNamePrefix(attemptId, commonPrefix); } - static List buildResumingTaskMap(SessionStore store, long attemptId, List resumingTaskIds) + static List buildResumingTaskMap(SessionStore store, long attemptId, List resumingTaskIds) throws ResourceNotFoundException { Set idSet = new HashSet<>(resumingTaskIds); - List resumingTasks = store + List archivedTasks = store .getTasksOfAttempt(attemptId) .stream() .filter(archived -> { if (idSet.remove(archived.getId())) { - if (archived.getState() != TaskStateCode.SUCCESS) { - throw new IllegalResumeException("Resuming non-successful tasks is not allowed: task_id=" + archived.getId()); - } return true; } return false; }) - .map(archived -> ResumingTask.of(archived)) .collect(Collectors.toList()); if (!idSet.isEmpty()) { throw new ResourceNotFoundException("Resuming tasks are not the members of resuming attempt: id list=" + idSet); } - return resumingTasks; + return archivedTasks; } //// diff --git a/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java b/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java index 4a03e906b4..fe16623a4f 100644 --- a/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java +++ b/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java @@ -21,6 +21,7 @@ import io.digdag.core.repository.StoredRevision; import io.digdag.core.repository.WorkflowDefinition; import io.digdag.core.session.ResumingTask; +import io.digdag.core.session.ArchivedTask; import io.digdag.core.session.ParameterUpdate; import io.digdag.core.session.Session; import io.digdag.core.session.SessionAttempt; @@ -43,6 +44,8 @@ import io.digdag.spi.TaskConflictException; import io.digdag.spi.TaskNotFoundException; import io.digdag.spi.metrics.DigdagMetrics; +import io.digdag.spi.metrics.DigdagMetrics.Category; + import static io.digdag.spi.metrics.DigdagMetrics.Category; import io.digdag.util.RetryControl; import org.slf4j.Logger; @@ -269,21 +272,15 @@ public StoredSessionAttemptWithSession submitTasks(int siteId, AttemptRequest ar TaskConfig.validateAttempt(attempt); - List resumingTasks; + List archivedTasks; if (ar.getResumingAttemptId().isPresent()) { - WorkflowTask root = tasks.get(0); - resumingTasks = TaskControl.buildResumingTaskMap( + archivedTasks = TaskControl.buildResumingTaskMap( sm.getSessionStore(siteId), ar.getResumingAttemptId().get(), ar.getResumingTasks()); - for (ResumingTask resumingTask : resumingTasks) { - if (resumingTask.getFullName().equals(root.getFullName())) { - throw new IllegalResumeException("Resuming root task is not allowed"); - } - } } else { - resumingTasks = ImmutableList.of(); + archivedTasks = ImmutableList.of(); } StoredSessionAttemptWithSession stored; @@ -313,7 +310,7 @@ public StoredSessionAttemptWithSession submitTasks(int siteId, AttemptRequest ar StoredSessionAttemptWithSession.of(siteId, storedSession, storedAttempt); try { - storeTasks(store, storedAttemptWithSession, tasks, resumingTasks, ar.getSessionMonitors()); + storeTasks(store, storedAttemptWithSession, tasks, archivedTasks, ar.getSessionMonitors()); } catch (TaskLimitExceededException ex) { throw new WorkflowTaskLimitExceededException(ex); @@ -348,21 +345,21 @@ public void storeTasks( SessionControlStore store, StoredSessionAttemptWithSession storedAttempt, WorkflowDefinition def, - List resumingTasks, + List archivedTasks, List sessionMonitors) throws TaskLimitExceededException { Workflow workflow = compiler.compile(def.getName(), def.getConfig()); WorkflowTaskList tasks = workflow.getTasks(); - storeTasks(store, storedAttempt, tasks, resumingTasks, sessionMonitors); + storeTasks(store, storedAttempt, tasks, archivedTasks, sessionMonitors); } public void storeTasks( SessionControlStore store, StoredSessionAttemptWithSession storedAttempt, WorkflowTaskList tasks, - List resumingTasks, + List archivedTasks, List sessionMonitors) throws TaskLimitExceededException { @@ -386,7 +383,7 @@ public void storeTasks( store.insertRootTask(storedAttempt.getId(), rootTask, (taskStore, storedTaskId) -> { try { TaskControl.addInitialTasksExceptingRootTask(taskStore, storedAttempt.getId(), - storedTaskId, tasks, resumingTasks, limits); + storedTaskId, tasks, archivedTasks, limits); } catch (TaskLimitExceededException ex) { throw new WorkflowTaskLimitExceededException(ex); From 6aa7cb907946adc896f1701e05376f1d2d518a2b Mon Sep 17 00:00:00 2001 From: Shuhei Nagasawa Date: Thu, 15 Feb 2024 15:06:41 +0900 Subject: [PATCH 3/4] fix collectResumingTasksForResumeFailedMode --- .../java/io/digdag/server/rs/AttemptResource.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/digdag-server/src/main/java/io/digdag/server/rs/AttemptResource.java b/digdag-server/src/main/java/io/digdag/server/rs/AttemptResource.java index a678e3e690..c739bf52ae 100644 --- a/digdag-server/src/main/java/io/digdag/server/rs/AttemptResource.java +++ b/digdag-server/src/main/java/io/digdag/server/rs/AttemptResource.java @@ -2,6 +2,7 @@ import java.util.List; import java.util.Set; +import java.util.Arrays; import java.util.HashSet; import java.util.stream.Collectors; import javax.ws.rs.Consumes; @@ -298,21 +299,24 @@ private List collectResumingTasks(RestSessionAttemptRequest.Resume resume) private List collectResumingTasksForResumeFailedMode(long attemptId) { + TaskStateCode[] statusArr = {TaskStateCode.SUCCESS, TaskStateCode.GROUP_ERROR, TaskStateCode.ERROR, TaskStateCode.CANCELED}; + List statuses = Arrays.asList(statusArr); + List tasks = sm .getSessionStore(getSiteId()) .getTasksOfAttempt(attemptId); - List successTasks = tasks.stream() - .filter(task -> task.getState() == TaskStateCode.SUCCESS) + List ids = tasks.stream() + .filter(t-> statuses.contains(t.getState())) .map(task -> { - if (!task.getParentId().isPresent()) { + if (!task.getParentId().isPresent() && task.getState() == TaskStateCode.SUCCESS) { throw new IllegalArgumentException("Resuming successfully completed attempts is not supported"); } return task.getId(); }) .collect(Collectors.toList()); - return ImmutableList.copyOf(successTasks); + return ImmutableList.copyOf(ids); } private List collectResumingTasksForResumeFromMode(long attemptId, String fromTaskPattern) From b79c994b1bca87def627b3f4ee5bc96a2cd13bc5 Mon Sep 17 00:00:00 2001 From: Shuhei Nagasawa Date: Mon, 26 Feb 2024 20:21:45 +0900 Subject: [PATCH 4/4] add test --- .../src/test/java/acceptance/RetryIT.java | 60 +++++++++++++++++++ .../test/resources/acceptance/retry/helper.py | 5 ++ .../resources/acceptance/retry/retry-4.dig | 10 ++++ .../resources/acceptance/retry/retry-5.dig | 10 ++++ 4 files changed, 85 insertions(+) create mode 100644 digdag-tests/src/test/resources/acceptance/retry/helper.py create mode 100644 digdag-tests/src/test/resources/acceptance/retry/retry-4.dig create mode 100644 digdag-tests/src/test/resources/acceptance/retry/retry-5.dig diff --git a/digdag-tests/src/test/java/acceptance/RetryIT.java b/digdag-tests/src/test/java/acceptance/RetryIT.java index 3894e9c25a..5ff5aa8980 100644 --- a/digdag-tests/src/test/java/acceptance/RetryIT.java +++ b/digdag-tests/src/test/java/acceptance/RetryIT.java @@ -179,6 +179,58 @@ public void testRetry() assertThat(retry5Attempt.getParams().get("key", String.class), is("value")); } + @Test + public void testRetryWithStoredParams() + throws Exception + { + DigdagClient client = DigdagClient.builder() + .host(server.host()) + .port(server.port()) + .build(); + + // Push the project + pushRevision("acceptance/retry/retry-4.dig", "retry"); + + // Start the workflow + Id originalAttemptId; + { + CommandStatus startStatus = main("start", + "-c", config.toString(), + "-e", server.endpoint(), + "retry", "retry", + "--session", "now"); + assertThat(startStatus.errUtf8(), startStatus.code(), is(0)); + originalAttemptId = getAttemptId(startStatus); + } + + // Wait for the attempt to fail + assertThat(joinAttempt(client, originalAttemptId).getSuccess(), is(false)); + + assertOutputExists("4-1", true); + + // Push a new revision + pushRevision("acceptance/retry/retry-5.dig", "retry"); + + // Retry with the latest fixed revision & resume from + Id retry2; + { + CommandStatus retryStatus = main("retry", + "-c", config.toString(), + "-e", server.endpoint(), + "--latest-revision", + "--resume-from", "+step2+b", + String.valueOf(originalAttemptId)); + assertThat(retryStatus.errUtf8(), retryStatus.code(), is(0)); + retry2 = getAttemptId(retryStatus); + } + + // Wait for the attempt to success + assertThat(joinAttempt(client, retry2).getSuccess(), is(true)); + + assertOutputExists("5-1", false); // skipped + assertOutputContents("5-2b", "stored_value"); + } + private void pushRevision(String resourceName, String workflowName) throws IOException { @@ -213,6 +265,14 @@ private void assertOutputExists(String name, boolean exists) assertThat(Files.exists(root().resolve(name + ".out")), is(exists)); } + private void assertOutputContents(String name, String contents) + throws IOException + { + assertThat( + new String(Files.readAllBytes(root().resolve(name + ".out")), UTF_8).trim(), + is(contents)); + } + private Path root() { return folder.getRoot().toPath().toAbsolutePath(); diff --git a/digdag-tests/src/test/resources/acceptance/retry/helper.py b/digdag-tests/src/test/resources/acceptance/retry/helper.py new file mode 100644 index 0000000000..218aa500e4 --- /dev/null +++ b/digdag-tests/src/test/resources/acceptance/retry/helper.py @@ -0,0 +1,5 @@ +import digdag + + +def store_v(value): + digdag.env.store({'v': value}) diff --git a/digdag-tests/src/test/resources/acceptance/retry/retry-4.dig b/digdag-tests/src/test/resources/acceptance/retry/retry-4.dig new file mode 100644 index 0000000000..0a125b6be7 --- /dev/null +++ b/digdag-tests/src/test/resources/acceptance/retry/retry-4.dig @@ -0,0 +1,10 @@ ++step1: + sh>: touch ${outdir}/4-1.out ++step2: + +a: + if>: true + _do: + py>: helper.store_v + value: stored_value + +b: + fail>: step2b fail diff --git a/digdag-tests/src/test/resources/acceptance/retry/retry-5.dig b/digdag-tests/src/test/resources/acceptance/retry/retry-5.dig new file mode 100644 index 0000000000..06aa15b582 --- /dev/null +++ b/digdag-tests/src/test/resources/acceptance/retry/retry-5.dig @@ -0,0 +1,10 @@ ++step1: + sh>: touch ${outdir}/5-1.out ++step2: + +a: + if>: true + _do: + py>: helper.store_v + value: stored_value + +b: + sh>: echo ${value} > ${outdir}/5-2b.out