diff --git a/common/src/main/java/com/netflix/conductor/common/constraints/TaskTimeoutConstraint.java b/common/src/main/java/com/netflix/conductor/common/constraints/TaskTimeoutConstraint.java index a9e6576f0..43a7cd8ba 100644 --- a/common/src/main/java/com/netflix/conductor/common/constraints/TaskTimeoutConstraint.java +++ b/common/src/main/java/com/netflix/conductor/common/constraints/TaskTimeoutConstraint.java @@ -66,6 +66,20 @@ public boolean isValid(TaskDef taskDef, ConstraintValidatorContext context) { } } + // Check if timeoutSeconds is greater than totalTimeoutSeconds + if (taskDef.getTimeoutSeconds() > 0 + && taskDef.getTotalTimeoutSeconds() > 0 + && taskDef.getTimeoutSeconds() > taskDef.getTotalTimeoutSeconds()) { + valid = false; + String message = + String.format( + "TaskDef: %s timeoutSeconds: %d must be less than or equal to totalTimeoutSeconds: %d", + taskDef.getName(), + taskDef.getTimeoutSeconds(), + taskDef.getTotalTimeoutSeconds()); + context.buildConstraintViolationWithTemplate(message).addConstraintViolation(); + } + return valid; } } diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java index 495ff06a9..2c948c3f9 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java @@ -202,6 +202,9 @@ public boolean isRetriable() { @ProtoField(id = 42) private boolean subworkflowChanged; + @ProtoField(id = 43) + private long firstStartTime; + // If the task is an event associated with a parent task, the id of the parent task private String parentTaskId; @@ -716,7 +719,9 @@ public boolean isLoopOverTask() { return iteration > 0; } - /** * @return the priority defined on workflow */ + /** + * @return the priority defined on workflow + */ public int getWorkflowPriority() { return workflowPriority; } @@ -765,6 +770,14 @@ public void setParentTaskId(String parentTaskId) { this.parentTaskId = parentTaskId; } + public long getFirstStartTime() { + return firstStartTime; + } + + public void setFirstStartTime(long firstStartTime) { + this.firstStartTime = firstStartTime; + } + public Task copy() { Task copy = new Task(); copy.setCallbackAfterSeconds(callbackAfterSeconds); @@ -798,6 +811,7 @@ public Task copy() { copy.setSubWorkflowId(getSubWorkflowId()); copy.setSubworkflowChanged(subworkflowChanged); copy.setParentTaskId(parentTaskId); + copy.setFirstStartTime(firstStartTime); return copy; } @@ -819,6 +833,7 @@ public Task deepCopy() { deepCopy.setReasonForIncompletion(reasonForIncompletion); deepCopy.setSeq(seq); deepCopy.setParentTaskId(parentTaskId); + deepCopy.setFirstStartTime(firstStartTime); return deepCopy; } @@ -919,6 +934,9 @@ public String toString() { + ", subworkflowChanged='" + subworkflowChanged + '\'' + + ", firstStartTime='" + + firstStartTime + + '\'' + '}'; } @@ -973,7 +991,8 @@ && getWorkflowPriority() == task.getWorkflowPriority() task.getExternalOutputPayloadStoragePath()) && Objects.equals(getIsolationGroupId(), task.getIsolationGroupId()) && Objects.equals(getExecutionNameSpace(), task.getExecutionNameSpace()) - && Objects.equals(getParentTaskId(), task.getParentTaskId()); + && Objects.equals(getParentTaskId(), task.getParentTaskId()) + && Objects.equals(getFirstStartTime(), task.getFirstStartTime()); } @Override @@ -1016,6 +1035,7 @@ public int hashCode() { getExternalOutputPayloadStoragePath(), getIsolationGroupId(), getExecutionNameSpace(), - getParentTaskId()); + getParentTaskId(), + getFirstStartTime()); } } diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java index 7e4357604..b96c54245 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java @@ -127,6 +127,10 @@ public enum RetryLogic { @ProtoField(id = 21) private String baseType; + @ProtoField(id = 22) + @NotNull + private long totalTimeoutSeconds; + private SchemaDef inputSchema; private SchemaDef outputSchema; private boolean enforceSchema; @@ -464,6 +468,14 @@ public void setEnforceSchema(boolean enforceSchema) { this.enforceSchema = enforceSchema; } + public long getTotalTimeoutSeconds() { + return totalTimeoutSeconds; + } + + public void setTotalTimeoutSeconds(long totalTimeoutSeconds) { + this.totalTimeoutSeconds = totalTimeoutSeconds; + } + @Override public String toString() { return name; @@ -497,7 +509,8 @@ && getRetryLogic() == taskDef.getRetryLogic() && Objects.equals(getOwnerEmail(), taskDef.getOwnerEmail()) && Objects.equals(getBaseType(), taskDef.getBaseType()) && Objects.equals(getInputSchema(), taskDef.getInputSchema()) - && Objects.equals(getOutputSchema(), taskDef.getOutputSchema()); + && Objects.equals(getOutputSchema(), taskDef.getOutputSchema()) + && Objects.equals(getTotalTimeoutSeconds(), taskDef.getTotalTimeoutSeconds()); } @Override @@ -523,6 +536,7 @@ public int hashCode() { getOwnerEmail(), getBaseType(), getInputSchema(), - getOutputSchema()); + getOutputSchema(), + getTotalTimeoutSeconds()); } } diff --git a/common/src/test/java/com/netflix/conductor/common/tasks/TaskDefTest.java b/common/src/test/java/com/netflix/conductor/common/tasks/TaskDefTest.java index a46cf7d5c..b3f4b9932 100644 --- a/common/src/test/java/com/netflix/conductor/common/tasks/TaskDefTest.java +++ b/common/src/test/java/com/netflix/conductor/common/tasks/TaskDefTest.java @@ -74,6 +74,28 @@ public void testTaskDef() { assertTrue(validationErrors.contains("ownerEmail cannot be empty")); } + @Test + public void testTaskDefTotalTimeOutSeconds() { + TaskDef taskDef = new TaskDef(); + taskDef.setName("test-task"); + taskDef.setRetryCount(1); + taskDef.setTimeoutSeconds(1000); + taskDef.setTotalTimeoutSeconds(900); + taskDef.setResponseTimeoutSeconds(1); + taskDef.setOwnerEmail("blah@gmail.com"); + + Set> result = validator.validate(taskDef); + assertEquals(1, result.size()); + + List validationErrors = new ArrayList<>(); + result.forEach(e -> validationErrors.add(e.getMessage())); + + assertTrue( + validationErrors.toString(), + validationErrors.contains( + "TaskDef: test-task timeoutSeconds: 1000 must be less than or equal to totalTimeoutSeconds: 900")); + } + @Test public void testTaskDefInvalidEmail() { TaskDef taskDef = new TaskDef(); diff --git a/common/src/test/java/com/netflix/conductor/common/tasks/TaskTest.java b/common/src/test/java/com/netflix/conductor/common/tasks/TaskTest.java index 402fcfcb0..f1a71bfdb 100644 --- a/common/src/test/java/com/netflix/conductor/common/tasks/TaskTest.java +++ b/common/src/test/java/com/netflix/conductor/common/tasks/TaskTest.java @@ -98,7 +98,7 @@ public void testDeepCopyTask() { final Task task = new Task(); // In order to avoid forgetting putting inside the copy method the newly added fields check // the number of declared fields. - final int expectedTaskFieldsNumber = 41; + final int expectedTaskFieldsNumber = 42; final int declaredFieldsNumber = task.getClass().getDeclaredFields().length; assertEquals(expectedTaskFieldsNumber, declaredFieldsNumber); diff --git a/core/src/main/java/com/netflix/conductor/model/TaskModel.java b/core/src/main/java/com/netflix/conductor/model/TaskModel.java index 122c31b5b..b169998cf 100644 --- a/core/src/main/java/com/netflix/conductor/model/TaskModel.java +++ b/core/src/main/java/com/netflix/conductor/model/TaskModel.java @@ -94,6 +94,9 @@ public boolean isRetriable() { /** Time when the task was last updated */ private long updateTime; + /** Time when first task started */ + private long firstStartTime; + private int startDelayInSeconds; private String retriedTaskId; diff --git a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java index 47ccc9d8b..d3d37246d 100644 --- a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java +++ b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java @@ -729,6 +729,7 @@ public TaskPb.Task toProto(Task from) { to.setSubWorkflowId( from.getSubWorkflowId() ); } to.setSubworkflowChanged( from.isSubworkflowChanged() ); + to.setFirstStartTime( from.getFirstStartTime() ); return to.build(); } @@ -788,6 +789,7 @@ public Task fromProto(TaskPb.Task from) { to.setIteration( from.getIteration() ); to.setSubWorkflowId( from.getSubWorkflowId() ); to.setSubworkflowChanged( from.getSubworkflowChanged() ); + to.setFirstStartTime( from.getFirstStartTime() ); return to; } @@ -875,6 +877,7 @@ public TaskDefPb.TaskDef toProto(TaskDef from) { if (from.getBaseType() != null) { to.setBaseType( from.getBaseType() ); } + to.setTotalTimeoutSeconds( from.getTotalTimeoutSeconds() ); return to.build(); } @@ -904,6 +907,7 @@ public TaskDef fromProto(TaskDefPb.TaskDef from) { to.setPollTimeoutSeconds( from.getPollTimeoutSeconds() ); to.setBackoffScaleFactor( from.getBackoffScaleFactor() ); to.setBaseType( from.getBaseType() ); + to.setTotalTimeoutSeconds( from.getTotalTimeoutSeconds() ); return to; } diff --git a/grpc/src/main/proto/model/task.proto b/grpc/src/main/proto/model/task.proto index 410aa0a06..0df2f9b0f 100644 --- a/grpc/src/main/proto/model/task.proto +++ b/grpc/src/main/proto/model/task.proto @@ -61,4 +61,5 @@ message Task { int32 iteration = 40; string sub_workflow_id = 41; bool subworkflow_changed = 42; + int64 first_start_time = 43; } diff --git a/grpc/src/main/proto/model/taskdef.proto b/grpc/src/main/proto/model/taskdef.proto index e531bcfec..28046426f 100644 --- a/grpc/src/main/proto/model/taskdef.proto +++ b/grpc/src/main/proto/model/taskdef.proto @@ -38,4 +38,5 @@ message TaskDef { int32 poll_timeout_seconds = 19; int32 backoff_scale_factor = 20; string base_type = 21; + int64 total_timeout_seconds = 22; }