From 0c5686742371a7f22ae4376d60867d52709ebf36 Mon Sep 17 00:00:00 2001 From: DanMiller192 <55767381+danmiller192@users.noreply.github.com> Date: Tue, 19 Nov 2024 16:58:49 +0000 Subject: [PATCH] Added maximum workflow sweep postpone config --- .../core/config/ConductorProperties.java | 12 +++++++ .../core/reconciliation/WorkflowSweeper.java | 12 ++++++- .../reconciliation/TestWorkflowSweeper.java | 34 ++++++++++++++++++- 3 files changed, 56 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java index f07f1485f..7ef41368a 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java +++ b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java @@ -42,6 +42,10 @@ public class ConductorProperties { @DurationUnit(ChronoUnit.SECONDS) private Duration workflowOffsetTimeout = Duration.ofSeconds(30); + /** The maximum timeout duration to set when a workflow is pushed to the decider queue. */ + @DurationUnit(ChronoUnit.SECONDS) + private Duration maxPostponeDurationSeconds = Duration.ofSeconds(3600); + /** The number of threads to use to do background sweep on active workflows. */ private int sweeperThreadCount = Runtime.getRuntime().availableProcessors() * 2; @@ -251,6 +255,14 @@ public void setWorkflowOffsetTimeout(Duration workflowOffsetTimeout) { this.workflowOffsetTimeout = workflowOffsetTimeout; } + public Duration getMaxPostponeDurationSeconds() { + return maxPostponeDurationSeconds; + } + + public void setMaxPostponeDurationSeconds(Duration maxPostponeDurationSeconds) { + this.maxPostponeDurationSeconds = maxPostponeDurationSeconds; + } + public int getSweeperThreadCount() { return sweeperThreadCount; } diff --git a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java index b39d122f0..9702eadd6 100644 --- a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java +++ b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java @@ -139,8 +139,18 @@ void unack(WorkflowModel workflowModel, long workflowOffsetTimeout) { } else if (taskModel.getTaskType().equals(TaskType.TASK_TYPE_HUMAN)) { postponeDurationSeconds = workflowOffsetTimeout; } else { - postponeDurationSeconds = workflowOffsetTimeout; + postponeDurationSeconds = + (taskModel.getResponseTimeoutSeconds() != 0) + ? taskModel.getResponseTimeoutSeconds() + 1 + : workflowOffsetTimeout; } + + if (postponeDurationSeconds + > properties.getMaxPostponeDurationSeconds().getSeconds()) { + postponeDurationSeconds = + properties.getMaxPostponeDurationSeconds().getSeconds(); + } + break; } else if (taskModel.getStatus() == Status.SCHEDULED) { Optional taskDefinition = taskModel.getTaskDefinition(); diff --git a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java index 042735b70..85f4273c3 100644 --- a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java +++ b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java @@ -49,6 +49,7 @@ public class TestWorkflowSweeper { private ExecutionLockService executionLockService; private int defaultPostPoneOffSetSeconds = 1800; + private int defaulMmaxPostponeDurationSeconds = 2000000; @Before public void setUp() { @@ -79,6 +80,8 @@ public void testPostponeDurationForHumanTaskType() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + when(properties.getMaxPostponeDurationSeconds()) + .thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( @@ -98,6 +101,8 @@ public void testPostponeDurationForWaitTaskType() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + when(properties.getMaxPostponeDurationSeconds()) + .thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( @@ -119,6 +124,8 @@ public void testPostponeDurationForWaitTaskTypeWithLongWaitTime() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + when(properties.getMaxPostponeDurationSeconds()) + .thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( @@ -174,6 +181,8 @@ public void testPostponeDurationForTaskInProgress() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + when(properties.getMaxPostponeDurationSeconds()) + .thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( @@ -200,7 +209,30 @@ public void testPostponeDurationForTaskInProgressWithResponseTimeoutSet() { .setUnackTimeout( DECIDER_QUEUE, workflowModel.getWorkflowId(), - defaultPostPoneOffSetSeconds * 1000); + (responseTimeout + 1) * 1000); + } + + @Test + public void testPostponeDurationForTaskInProgressWithResponseTimeoutSetLongerThanMaxPostponeDuration() { + long responseTimeout = defaulMmaxPostponeDurationSeconds + 1; + WorkflowModel workflowModel = new WorkflowModel(); + workflowModel.setWorkflowId("1"); + TaskModel taskModel = new TaskModel(); + taskModel.setTaskId("task1"); + taskModel.setTaskType(TaskType.TASK_TYPE_SIMPLE); + taskModel.setStatus(Status.IN_PROGRESS); + taskModel.setResponseTimeoutSeconds(responseTimeout); + workflowModel.setTasks(List.of(taskModel)); + when(properties.getWorkflowOffsetTimeout()) + .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + when(properties.getMaxPostponeDurationSeconds()) + .thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds)); + workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); + verify(queueDAO) + .setUnackTimeout( + DECIDER_QUEUE, + workflowModel.getWorkflowId(), + defaulMmaxPostponeDurationSeconds * 1000L); } @Test