diff --git a/core/src/main/java/io/kestra/core/models/triggers/Trigger.java b/core/src/main/java/io/kestra/core/models/triggers/Trigger.java index 0d43370445c..7e697055830 100644 --- a/core/src/main/java/io/kestra/core/models/triggers/Trigger.java +++ b/core/src/main/java/io/kestra/core/models/triggers/Trigger.java @@ -23,9 +23,6 @@ public class Trigger extends TriggerContext { @Nullable private String executionId; - @Nullable - private State.Type executionCurrentState; - @Nullable private Instant updatedDate; @@ -39,7 +36,6 @@ public class Trigger extends TriggerContext { protected Trigger(TriggerBuilder b) { super(b); this.executionId = b.executionId; - this.executionCurrentState = b.executionCurrentState; this.updatedDate = b.updatedDate; this.evaluateRunningDate = b.evaluateRunningDate; } @@ -141,7 +137,6 @@ public static Trigger of(Execution execution, Trigger trigger) { .date(trigger.getDate()) .nextExecutionDate(trigger.getNextExecutionDate()) .executionId(execution.getId()) - .executionCurrentState(execution.getState().getCurrent()) .updatedDate(Instant.now()) .backfill(trigger.getBackfill()) .stopAfter(trigger.getStopAfter()) diff --git a/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java b/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java index 74e4535577e..5f75fc8da39 100644 --- a/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java +++ b/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java @@ -73,6 +73,7 @@ public abstract class AbstractScheduler implements Scheduler, Service { private final PluginDefaultService pluginDefaultService; private final WorkerGroupService workerGroupService; private final LogService logService; + protected SchedulerExecutionStateInterface executionState; // must be volatile as it's updated by the flow listener thread and read by the scheduleExecutor thread private volatile Boolean isReady = false; @@ -594,8 +595,10 @@ private boolean isExecutionNotRunning(FlowWithWorkerTrigger f) { return true; } - // The execution is not yet started, we skip - if (lastTrigger.getExecutionCurrentState() == null) { + Optional execution = executionState.findById(lastTrigger.getTenantId(), lastTrigger.getExecutionId()); + + // executionState hasn't received the execution, we skip + if (execution.isEmpty()) { if (lastTrigger.getUpdatedDate() != null) { metricRegistry .timer(MetricRegistry.SCHEDULER_EXECUTION_MISSING_DURATION, metricRegistry.tags(lastTrigger)) @@ -629,7 +632,7 @@ private boolean isExecutionNotRunning(FlowWithWorkerTrigger f) { Level.DEBUG, "Execution '{}' is still '{}', updated at '{}'", lastTrigger.getExecutionId(), - lastTrigger.getExecutionCurrentState(), + execution.get().getState().getCurrent(), lastTrigger.getUpdatedDate() ); } diff --git a/core/src/main/java/io/kestra/core/schedulers/SchedulerExecutionState.java b/core/src/main/java/io/kestra/core/schedulers/SchedulerExecutionState.java new file mode 100644 index 00000000000..bf8acf5e726 --- /dev/null +++ b/core/src/main/java/io/kestra/core/schedulers/SchedulerExecutionState.java @@ -0,0 +1,19 @@ +package io.kestra.core.schedulers; + +import io.kestra.core.models.executions.Execution; +import io.kestra.core.repositories.ExecutionRepositoryInterface; + +import java.util.Optional; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +@Singleton +public class SchedulerExecutionState implements SchedulerExecutionStateInterface { + @Inject + private ExecutionRepositoryInterface executionRepository; + + @Override + public Optional findById(String tenantId, String id) { + return executionRepository.findById(tenantId, id); + } +} diff --git a/core/src/main/java/io/kestra/core/schedulers/SchedulerExecutionStateInterface.java b/core/src/main/java/io/kestra/core/schedulers/SchedulerExecutionStateInterface.java new file mode 100644 index 00000000000..32652b0a99d --- /dev/null +++ b/core/src/main/java/io/kestra/core/schedulers/SchedulerExecutionStateInterface.java @@ -0,0 +1,9 @@ +package io.kestra.core.schedulers; + +import io.kestra.core.models.executions.Execution; + +import java.util.Optional; + +public interface SchedulerExecutionStateInterface { + Optional findById(String tenantId, String id); +} diff --git a/core/src/test/java/io/kestra/core/schedulers/SchedulerConditionTest.java b/core/src/test/java/io/kestra/core/schedulers/SchedulerConditionTest.java index 6e9826b5c22..662b158c5e6 100644 --- a/core/src/test/java/io/kestra/core/schedulers/SchedulerConditionTest.java +++ b/core/src/test/java/io/kestra/core/schedulers/SchedulerConditionTest.java @@ -18,13 +18,13 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.*; class SchedulerConditionTest extends AbstractSchedulerTest { @Inject @@ -33,6 +33,9 @@ class SchedulerConditionTest extends AbstractSchedulerTest { @Inject protected SchedulerTriggerStateInterface triggerState; + @Inject + protected SchedulerExecutionStateInterface executionState; + private static Flow createScheduleFlow() { Schedule schedule = Schedule.builder() .id("hourly") @@ -58,6 +61,7 @@ private static Flow createScheduleFlow() { void schedule() throws Exception { // mock flow listeners FlowListeners flowListenersServiceSpy = spy(this.flowListenersService); + SchedulerExecutionStateInterface executionRepositorySpy = spy(this.executionState); CountDownLatch queueCount = new CountDownLatch(4); Flow flow = createScheduleFlow(); @@ -74,6 +78,11 @@ void schedule() throws Exception { .when(flowListenersServiceSpy) .flows(); + // mock the backfill execution is ended + doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build())) + .when(executionRepositorySpy) + .findById(any(), any()); + // scheduler try (AbstractScheduler scheduler = new JdbcScheduler( applicationContext, @@ -94,7 +103,7 @@ void schedule() throws Exception { }); scheduler.run(); - queueCount.await(30, TimeUnit.SECONDS); + queueCount.await(15, TimeUnit.SECONDS); receive.blockLast(); diff --git a/core/src/test/java/io/kestra/core/schedulers/SchedulerPollingTriggerTest.java b/core/src/test/java/io/kestra/core/schedulers/SchedulerPollingTriggerTest.java index 3b2c8219cce..b48bc0ab573 100644 --- a/core/src/test/java/io/kestra/core/schedulers/SchedulerPollingTriggerTest.java +++ b/core/src/test/java/io/kestra/core/schedulers/SchedulerPollingTriggerTest.java @@ -39,6 +39,9 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest { @Inject private SchedulerTriggerStateInterface triggerState; + @Inject + private SchedulerExecutionState schedulerExecutionState; + @Inject private FlowListeners flowListenersService; diff --git a/core/src/test/java/io/kestra/core/schedulers/SchedulerScheduleTest.java b/core/src/test/java/io/kestra/core/schedulers/SchedulerScheduleTest.java index 0341620fd1b..686dd3ba206 100644 --- a/core/src/test/java/io/kestra/core/schedulers/SchedulerScheduleTest.java +++ b/core/src/test/java/io/kestra/core/schedulers/SchedulerScheduleTest.java @@ -38,6 +38,9 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest { @Inject protected SchedulerTriggerStateInterface triggerState; + @Inject + protected SchedulerExecutionStateInterface executionState; + @Inject @Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) protected QueueInterface logQueue; @@ -65,7 +68,7 @@ private ZonedDateTime date(int minus) { .truncatedTo(ChronoUnit.HOURS); } - protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) { + protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) { return new JdbcScheduler( applicationContext, flowListenersServiceSpy @@ -77,6 +80,7 @@ protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) { void schedule() throws Exception { // mock flow listeners FlowListeners flowListenersServiceSpy = spy(this.flowListenersService); + SchedulerExecutionStateInterface executionStateSpy = spy(this.executionState); CountDownLatch queueCount = new CountDownLatch(6); CountDownLatch invalidLogCount = new CountDownLatch(1); Set date = new HashSet<>(); @@ -111,7 +115,7 @@ void schedule() throws Exception { triggerState.create(trigger.toBuilder().triggerId("schedule-invalid").flowId(invalid.getId()).build()); // scheduler - try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) { + try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionStateSpy)) { // wait for execution Flux receiveExecutions = TestsUtils.receive(executionQueue, either -> { Execution execution = either.getLeft(); @@ -171,7 +175,7 @@ void retroSchedule() throws Exception { triggerState.create(trigger); // scheduler - try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) { + try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) { scheduler.run(); Await.until(() -> { @@ -205,7 +209,7 @@ void recoverALLMissing() throws Exception { CountDownLatch queueCount = new CountDownLatch(1); // scheduler - try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) { + try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) { // wait for execution Flux receive = TestsUtils.receive(executionQueue, either -> { Execution execution = either.getLeft(); @@ -250,7 +254,7 @@ void recoverLASTMissing() throws Exception { CountDownLatch queueCount = new CountDownLatch(1); // scheduler - try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) { + try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) { // wait for execution Flux receive = TestsUtils.receive(executionQueue, either -> { Execution execution = either.getLeft(); @@ -294,7 +298,7 @@ void recoverNONEMissing() throws Exception { triggerState.create(lastTrigger); // scheduler - try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) { + try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) { scheduler.run(); Await.until(() -> scheduler.isReady(), Duration.ofMillis(100), Duration.ofSeconds(5)); @@ -325,7 +329,7 @@ void backfill() throws Exception { .build(); // scheduler - try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) { + try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) { scheduler.run(); Await.until(() -> { @@ -390,7 +394,7 @@ void disabled() throws Exception { triggerState.create(trigger); // scheduler - try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) { + try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) { scheduler.run(); // Wait 3s to see if things happen @@ -428,7 +432,7 @@ void stopAfterSchedule() throws Exception { CountDownLatch queueCount = new CountDownLatch(2); // scheduler - try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) { + try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) { // wait for execution Flux receive = TestsUtils.receive(executionQueue, either -> { Execution execution = either.getLeft(); @@ -488,7 +492,7 @@ void failedEvaluationTest() { CountDownLatch queueCount = new CountDownLatch(1); // scheduler - try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) { + try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) { // wait for execution Flux receive = TestsUtils.receive(executionQueue, either -> { Execution execution = either.getLeft(); diff --git a/core/src/test/java/io/kestra/core/schedulers/SchedulerThreadTest.java b/core/src/test/java/io/kestra/core/schedulers/SchedulerThreadTest.java index 9cc676b4605..d1c7921b5d9 100644 --- a/core/src/test/java/io/kestra/core/schedulers/SchedulerThreadTest.java +++ b/core/src/test/java/io/kestra/core/schedulers/SchedulerThreadTest.java @@ -19,20 +19,20 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.*; public class SchedulerThreadTest extends AbstractSchedulerTest { @Inject protected FlowListeners flowListenersService; @Inject - protected SchedulerTriggerStateInterface triggerState; + protected SchedulerExecutionStateInterface executionState; @Test void thread() throws Exception { @@ -53,12 +53,17 @@ void thread() throws Exception { // mock flow listeners FlowListeners flowListenersServiceSpy = spy(this.flowListenersService); - + SchedulerExecutionStateInterface schedulerExecutionStateSpy = spy(this.executionState); doReturn(Collections.singletonList(flow)) .when(flowListenersServiceSpy) .flows(); + // mock the backfill execution is ended + doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build())) + .when(schedulerExecutionStateSpy) + .findById(any(), any()); + // scheduler try ( AbstractScheduler scheduler = new JdbcScheduler( diff --git a/jdbc-h2/src/test/java/io/kestra/schedulers/h2/H2SchedulerScheduleTest.java b/jdbc-h2/src/test/java/io/kestra/schedulers/h2/H2SchedulerScheduleTest.java index 059860c6c75..2d6211ff551 100644 --- a/jdbc-h2/src/test/java/io/kestra/schedulers/h2/H2SchedulerScheduleTest.java +++ b/jdbc-h2/src/test/java/io/kestra/schedulers/h2/H2SchedulerScheduleTest.java @@ -2,12 +2,13 @@ import io.kestra.core.runners.FlowListeners; import io.kestra.core.schedulers.AbstractScheduler; +import io.kestra.core.schedulers.SchedulerExecutionStateInterface; import io.kestra.core.schedulers.SchedulerScheduleTest; import io.kestra.jdbc.runner.JdbcScheduler; class H2SchedulerScheduleTest extends SchedulerScheduleTest { @Override - protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) { + protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) { return new JdbcScheduler( applicationContext, flowListenersServiceSpy diff --git a/jdbc-mysql/src/test/java/io/kestra/schedulers/mysql/MysqlSchedulerScheduleTest.java b/jdbc-mysql/src/test/java/io/kestra/schedulers/mysql/MysqlSchedulerScheduleTest.java index 70563f3485d..6e017199ebc 100644 --- a/jdbc-mysql/src/test/java/io/kestra/schedulers/mysql/MysqlSchedulerScheduleTest.java +++ b/jdbc-mysql/src/test/java/io/kestra/schedulers/mysql/MysqlSchedulerScheduleTest.java @@ -2,12 +2,13 @@ import io.kestra.core.runners.FlowListeners; import io.kestra.core.schedulers.AbstractScheduler; +import io.kestra.core.schedulers.SchedulerExecutionStateInterface; import io.kestra.core.schedulers.SchedulerScheduleTest; import io.kestra.jdbc.runner.JdbcScheduler; class MysqlSchedulerScheduleTest extends SchedulerScheduleTest { @Override - protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) { + protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) { return new JdbcScheduler( applicationContext, flowListenersServiceSpy diff --git a/jdbc-postgres/src/test/java/io/kestra/schedulers/postgres/PostgresSchedulerScheduleTest.java b/jdbc-postgres/src/test/java/io/kestra/schedulers/postgres/PostgresSchedulerScheduleTest.java index 14217d717c2..06ad6f687d3 100644 --- a/jdbc-postgres/src/test/java/io/kestra/schedulers/postgres/PostgresSchedulerScheduleTest.java +++ b/jdbc-postgres/src/test/java/io/kestra/schedulers/postgres/PostgresSchedulerScheduleTest.java @@ -2,12 +2,13 @@ import io.kestra.core.runners.FlowListeners; import io.kestra.core.schedulers.AbstractScheduler; +import io.kestra.core.schedulers.SchedulerExecutionStateInterface; import io.kestra.core.schedulers.SchedulerScheduleTest; import io.kestra.jdbc.runner.JdbcScheduler; class PostgresSchedulerScheduleTest extends SchedulerScheduleTest { @Override - protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) { + protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) { return new JdbcScheduler( applicationContext, flowListenersServiceSpy diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepository.java index a569a71cb45..8c331a5db96 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepository.java @@ -169,7 +169,6 @@ public Trigger updateExecution(Trigger trigger) { Trigger current = optionalTrigger.get(); current = current.toBuilder() .executionId(trigger.getExecutionId()) - .executionCurrentState(trigger.getExecutionCurrentState()) .updatedDate(trigger.getUpdatedDate()) .build(); this.save(context, current); diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcScheduler.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcScheduler.java index bbce28b6a5b..0653145a57e 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcScheduler.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcScheduler.java @@ -31,10 +31,10 @@ public class JdbcScheduler extends AbstractScheduler { private final QueueInterface executionQueue; private final TriggerRepositoryInterface triggerRepository; - private final ConditionService conditionService; private final FlowRepositoryInterface flowRepository; private final JooqDSLContextWrapper dslContextWrapper; + private final ConditionService conditionService; @SuppressWarnings("unchecked") @@ -48,6 +48,7 @@ public JdbcScheduler( executionQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.EXECUTION_NAMED)); triggerRepository = applicationContext.getBean(AbstractJdbcTriggerRepository.class); triggerState = applicationContext.getBean(SchedulerTriggerStateInterface.class); + executionState = applicationContext.getBean(SchedulerExecutionState.class); conditionService = applicationContext.getBean(ConditionService.class); flowRepository = applicationContext.getBean(FlowRepositoryInterface.class); dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class); @@ -75,14 +76,6 @@ public void run() { .ifPresent(trigger -> { this.triggerState.update(trigger.resetExecution(execution.getState().getCurrent())); }); - } else { - // update execution state on each state change so the scheduler knows the execution is running - triggerRepository - .findByExecution(execution) - .filter(trigger -> execution.getState().getCurrent() != trigger.getExecutionCurrentState()) - .ifPresent(trigger -> { - ((JdbcSchedulerTriggerState) this.triggerState).updateExecution(Trigger.of(execution, trigger)); - }); } } } diff --git a/ui/src/components/admin/Triggers.vue b/ui/src/components/admin/Triggers.vue index 011abe2a11d..d0e190e0d5f 100644 --- a/ui/src/components/admin/Triggers.vue +++ b/ui/src/components/admin/Triggers.vue @@ -125,16 +125,6 @@ - - - -