Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/dynamic pause #6199

Merged
merged 3 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion core/src/main/java/io/kestra/core/models/property/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.Serial;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* Define a plugin properties that will be rendered and converted to a target type at use time.
Expand All @@ -35,7 +36,7 @@
public class Property<T> {
// By default, durations are stored as numbers.
// We cannot change that globally, as in JDBC/Elastic 'execution.state.duration' must be a number to be able to aggregate them.
// So we only change it here.
// So we only change it here to be used for Property.of().
private static final ObjectMapper MAPPER = JacksonMapper.ofJson()
.copy()
.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);
Expand Down Expand Up @@ -91,6 +92,7 @@ public static <V> Property<V> of(V value) {
public static <T> T as(Property<T> property, RunContext runContext, Class<T> clazz) throws IllegalVariableEvaluationException {
if (property.value == null) {
String rendered = runContext.render(property.expression);
// special case for duration as they should be serialized as double but are not always
property.value = MAPPER.convertValue(rendered, clazz);
}

Expand Down Expand Up @@ -331,6 +333,18 @@ public String toString() {
return value != null ? value.toString() : expression;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
Property<?> property = (Property<?>) o;
return Objects.equals(expression, property.expression);
}

@Override
public int hashCode() {
return Objects.hash(expression);
}

// used only by the serializer
String getExpression() {
return this.expression;
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/io/kestra/core/models/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.core.flow.WorkingDirectory;
Expand Down Expand Up @@ -35,7 +36,7 @@ abstract public class Task implements TaskInterface {
@Valid
protected AbstractRetry retry;

protected Duration timeout;
protected Property<Duration> timeout;

@Builder.Default
protected Boolean disabled = false;
Expand Down
20 changes: 13 additions & 7 deletions core/src/main/java/io/kestra/core/runners/ExecutorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.event.Level;

import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -618,13 +619,18 @@ private Executor handlePausedDelay(Executor executor, List<WorkerTaskResult> wor

if (task instanceof Pause pauseTask) {
if (pauseTask.getDelay() != null || pauseTask.getTimeout() != null) {
return ExecutionDelay.builder()
.taskRunId(workerTaskResult.getTaskRun().getId())
.executionId(executor.getExecution().getId())
.date(workerTaskResult.getTaskRun().getState().maxDate().plus(pauseTask.getDelay() != null ? pauseTask.getDelay() : pauseTask.getTimeout()))
.state(pauseTask.getDelay() != null ? State.Type.RUNNING : State.Type.FAILED)
.delayType(ExecutionDelay.DelayType.RESUME_FLOW)
.build();
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
Duration delay = runContext.render(pauseTask.getDelay()).as(Duration.class).orElse(null);
Duration timeout = runContext.render(pauseTask.getTimeout()).as(Duration.class).orElse(null);
if (delay != null || timeout != null) { // rendering can lead to null, so we must re-check here
return ExecutionDelay.builder()
.taskRunId(workerTaskResult.getTaskRun().getId())
.executionId(executor.getExecution().getId())
.date(workerTaskResult.getTaskRun().getState().maxDate().plus(delay != null ? delay : timeout))
.state(delay != null ? State.Type.RUNNING : State.Type.FAILED)
.delayType(ExecutionDelay.DelayType.RESUME_FLOW)
.build();
}
}
}

Expand Down
13 changes: 5 additions & 8 deletions core/src/main/java/io/kestra/core/runners/FlowInputOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import jakarta.validation.constraints.NotNull;

import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
Expand Down Expand Up @@ -73,10 +71,10 @@
*/
@Singleton
public class FlowInputOutput {
private static final Logger log = LoggerFactory.getLogger(FlowInputOutput.class);
private static final Pattern URI_PATTERN = Pattern.compile("^[a-z]+:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*)$");
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml();
private static final ObjectMapper JSON_MAPPER = JacksonMapper.ofJson();

public static final Pattern URI_PATTERN = Pattern.compile("^[a-z]+:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*)$");
public static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml();
private final StorageInterface storageInterface;
private final Optional<String> secretKey;
private final RunContextFactory runContextFactory;
Expand Down Expand Up @@ -358,16 +356,15 @@ public Map<String, Object> typedOutputs(
if (flow.getOutputs() == null) {
return ImmutableMap.of();
}
final ObjectMapper mapper = new ObjectMapper();
Map<String, Object> results = flow
.getOutputs()
.stream()
.map(output -> {
final HashMap<String, Object> current;
final Object currentValue;
try {
current = in == null ? null : mapper.readValue(
mapper.writeValueAsString(in.get(output.getId())), new TypeReference<>() {});
current = in == null ? null : JSON_MAPPER.readValue(
JSON_MAPPER.writeValueAsString(in.get(output.getId())), new TypeReference<>() {});
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ protected void kill(final boolean markAsKilled) {

@Override
public State.Type doCall() throws Exception {
final Duration workerTaskTimeout = workerTask.getTask().getTimeout();
final Duration workerTaskTimeout = workerTask.getRunContext().render(workerTask.getTask().getTimeout()).as(Duration.class).orElse(null);

try {
if (workerTaskTimeout != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.kestra.core.serializers;

import com.fasterxml.jackson.core.*;
import com.fasterxml.jackson.core.io.NumberInput;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.datatype.jsr310.DecimalUtils;

import java.io.IOException;
import java.math.BigDecimal;
import java.time.DateTimeException;
import java.time.Duration;

public class DurationDeserializer extends com.fasterxml.jackson.datatype.jsr310.deser.DurationDeserializer {

// durations can be a string with a number which is not taken into account as it should not happen
// we specialize the Duration deserialization from string to support that
@Override
protected Duration _fromString(JsonParser parser, DeserializationContext ctxt, String value0) throws IOException {
String value = value0.trim();
if (value.isEmpty()) {
// 22-Oct-2020, tatu: not sure if we should pass original (to distinguish
// b/w empty and blank); for now don't which will allow blanks to be
// handled like "regular" empty (same as pre-2.12)
return _fromEmptyString(parser, ctxt, value);
}
// 30-Sep-2020: Should allow use of "Timestamp as String" for
// some textual formats
if (ctxt.isEnabled(StreamReadCapability.UNTYPED_SCALARS)
&& _isValidTimestampString(value)) {
return _fromTimestamp(ctxt, NumberInput.parseLong(value));
}

// These are the only lines we changed from the default impl: we check for a float as string and parse it
if (_isFloat(value)) {
double d = Double.parseDouble(value);
BigDecimal bigDecimal = BigDecimal.valueOf(d);
return DecimalUtils.extractSecondsAndNanos(bigDecimal, Duration::ofSeconds);
}

try {
return Duration.parse(value);
} catch (DateTimeException e) {
return _handleDateTimeException(ctxt, e, value);
}
}

// this method is inspired by _isIntNumber but allow the decimal separator '.'
private boolean _isFloat(String text) {
final int len = text.length();
if (len > 0) {
char c = text.charAt(0);
// skip leading sign (plus not allowed for strict JSON numbers but...)
int i;

if (c == '-' || c == '+') {
if (len == 1) {
return false;
}
i = 1;
} else {
i = 0;
}
// We will allow leading
for (; i < len; ++i) {
int ch = text.charAt(i);
if (ch == '.') {
continue;
}
if (ch > '9' || ch < '0') {
return false;
}
}
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.dataformat.ion.IonObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
Expand All @@ -29,6 +30,7 @@
import org.yaml.snakeyaml.LoaderOptions;

import java.io.IOException;
import java.time.Duration;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -119,6 +121,9 @@ public static ObjectMapper ofIon() {
}

private static ObjectMapper configure(ObjectMapper mapper) {
SimpleModule durationDeserialization = new SimpleModule();
durationDeserialization.addDeserializer(Duration.class, new DurationDeserializer());

return mapper
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
Expand All @@ -128,6 +133,7 @@ private static ObjectMapper configure(ObjectMapper mapper) {
.registerModules(new GuavaModule())
.registerModule(new PluginModule())
.registerModule(new RunContextModule())
.registerModule(durationDeserialization)
.setTimeZone(TimeZone.getDefault());
}

Expand Down
7 changes: 3 additions & 4 deletions core/src/main/java/io/kestra/plugin/core/flow/Pause.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.GraphTask;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
Expand Down Expand Up @@ -135,15 +136,13 @@ public class Pause extends Task implements FlowableTask<Pause.Output> {
title = "Duration of the pause — useful if you want to pause the execution for a fixed amount of time.",
description = "The delay is a string in the [ISO 8601 Duration](https://en.wikipedia.org/wiki/ISO_8601#Durations) format, e.g. `PT1H` for 1 hour, `PT30M` for 30 minutes, `PT10S` for 10 seconds, `P1D` for 1 day, etc. If no delay and no timeout are configured, the execution will never end until it's manually resumed from the UI or API."
)
@PluginProperty
private Duration delay;
private Property<Duration> delay;

@Schema(
title = "Timeout of the pause — useful to avoid never-ending workflows in a human-in-the-loop scenario. For example, if you want to pause the execution until a human validates some data generated in a previous task, you can set a timeout of e.g. 24 hours. If no manual approval happens within 24 hours, the execution will automatically resume without a prior data validation.",
description = "If no delay and no timeout are configured, the execution will never end until it's manually resumed from the UI or API."
)
@PluginProperty
private Duration timeout;
private Property<Duration> timeout;

@Valid
@Schema(
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/java/io/kestra/core/runners/WorkerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
Expand Down Expand Up @@ -97,7 +98,7 @@ void failOnWorkerTaskWithFlowable() throws TimeoutException, QueueException, Jso

Pause pause = Pause.builder()
.type(Pause.class.getName())
.delay(Duration.ofSeconds(1))
.delay(Property.of(Duration.ofSeconds(1)))
.id("unit-test")
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.flows.input.StringInput;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.Constant;
import io.kestra.core.models.validations.ModelValidator;
Expand All @@ -32,7 +33,7 @@

@KestraTest
class YamlParserTest {
private static ObjectMapper mapper = JacksonMapper.ofJson();
private static final ObjectMapper MAPPER = JacksonMapper.ofJson();

@Inject
private YamlParser yamlParser;
Expand All @@ -49,7 +50,7 @@ void parse() {

// third with all optionals
Task optionals = flow.getTasks().get(2);
assertThat(optionals.getTimeout(), is(Duration.ofMinutes(60)));
assertThat(optionals.getTimeout(), is(Property.builder().expression("PT60M").build()));
assertThat(optionals.getRetry().getType(), is("constant"));
assertThat(optionals.getRetry().getMaxAttempt(), is(5));
assertThat(((Constant) optionals.getRetry()).getInterval().getSeconds(), is(900L));
Expand All @@ -65,7 +66,7 @@ void parseString() throws IOException {

// third with all optionals
Task optionals = flow.getTasks().get(2);
assertThat(optionals.getTimeout(), is(Duration.ofMinutes(60)));
assertThat(optionals.getTimeout(), is(Property.builder().expression("PT60M").build()));
assertThat(optionals.getRetry().getType(), is("constant"));
assertThat(optionals.getRetry().getMaxAttempt(), is(5));
assertThat(((Constant) optionals.getRetry()).getInterval().getSeconds(), is(900L));
Expand Down Expand Up @@ -166,15 +167,15 @@ void listeners() {
void serialization() throws IOException {
Flow flow = this.parse("flows/valids/minimal.yaml");

String s = mapper.writeValueAsString(flow);
String s = MAPPER.writeValueAsString(flow);
assertThat(s, is("{\"id\":\"minimal\",\"namespace\":\"io.kestra.tests\",\"revision\":2,\"disabled\":false,\"deleted\":false,\"labels\":[{\"key\":\"system.readOnly\",\"value\":\"true\"}],\"tasks\":[{\"id\":\"date\",\"type\":\"io.kestra.plugin.core.debug.Return\",\"format\":\"{{taskrun.startDate}}\"}]}"));
}

@Test
void noDefault() throws IOException {
Flow flow = this.parse("flows/valids/parallel.yaml");

String s = mapper.writeValueAsString(flow);
String s = MAPPER.writeValueAsString(flow);
assertThat(s, not(containsString("\"-c\"")));
assertThat(s, containsString("\"deleted\":false"));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.core.validations;

import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.plugin.core.flow.Pause;
Expand Down Expand Up @@ -57,7 +58,7 @@ void workingDirectoryInvalid() {
List.of(Pause.builder()
.id("pause")
.type(Pause.class.getName())
.delay(Duration.ofSeconds(1L))
.delay(Property.of(Duration.ofSeconds(1L)))
.build()
)
)
Expand Down
Loading