Skip to content

Commit

Permalink
Revert "feat(core): remove the execution state from the scheduler (#1588
Browse files Browse the repository at this point in the history
)"

This reverts commit f7d3d0b.
  • Loading branch information
loicmathieu committed Nov 28, 2024
1 parent ac678b8 commit bdef596
Show file tree
Hide file tree
Showing 14 changed files with 80 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ public class Trigger extends TriggerContext {
@Nullable
private String executionId;

@Nullable
private State.Type executionCurrentState;

@Nullable
private Instant updatedDate;

Expand All @@ -39,7 +36,6 @@ public class Trigger extends TriggerContext {
protected Trigger(TriggerBuilder<?, ?> b) {
super(b);
this.executionId = b.executionId;
this.executionCurrentState = b.executionCurrentState;
this.updatedDate = b.updatedDate;
this.evaluateRunningDate = b.evaluateRunningDate;
}
Expand Down Expand Up @@ -141,7 +137,6 @@ public static Trigger of(Execution execution, Trigger trigger) {
.date(trigger.getDate())
.nextExecutionDate(trigger.getNextExecutionDate())
.executionId(execution.getId())
.executionCurrentState(execution.getState().getCurrent())
.updatedDate(Instant.now())
.backfill(trigger.getBackfill())
.stopAfter(trigger.getStopAfter())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private final PluginDefaultService pluginDefaultService;
private final WorkerGroupService workerGroupService;
private final LogService logService;
protected SchedulerExecutionStateInterface executionState;

// must be volatile as it's updated by the flow listener thread and read by the scheduleExecutor thread
private volatile Boolean isReady = false;
Expand Down Expand Up @@ -594,8 +595,10 @@ private boolean isExecutionNotRunning(FlowWithWorkerTrigger f) {
return true;
}

// The execution is not yet started, we skip
if (lastTrigger.getExecutionCurrentState() == null) {
Optional<Execution> execution = executionState.findById(lastTrigger.getTenantId(), lastTrigger.getExecutionId());

// executionState hasn't received the execution, we skip
if (execution.isEmpty()) {
if (lastTrigger.getUpdatedDate() != null) {
metricRegistry
.timer(MetricRegistry.SCHEDULER_EXECUTION_MISSING_DURATION, metricRegistry.tags(lastTrigger))
Expand Down Expand Up @@ -629,7 +632,7 @@ private boolean isExecutionNotRunning(FlowWithWorkerTrigger f) {
Level.DEBUG,
"Execution '{}' is still '{}', updated at '{}'",
lastTrigger.getExecutionId(),
lastTrigger.getExecutionCurrentState(),
execution.get().getState().getCurrent(),
lastTrigger.getUpdatedDate()
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.kestra.core.schedulers;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.repositories.ExecutionRepositoryInterface;

import java.util.Optional;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

@Singleton
public class SchedulerExecutionState implements SchedulerExecutionStateInterface {
@Inject
private ExecutionRepositoryInterface executionRepository;

@Override
public Optional<Execution> findById(String tenantId, String id) {
return executionRepository.findById(tenantId, id);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.kestra.core.schedulers;

import io.kestra.core.models.executions.Execution;

import java.util.Optional;

public interface SchedulerExecutionStateInterface {
Optional<Execution> findById(String tenantId, String id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.*;

class SchedulerConditionTest extends AbstractSchedulerTest {
@Inject
Expand All @@ -33,6 +33,9 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
@Inject
protected SchedulerTriggerStateInterface triggerState;

@Inject
protected SchedulerExecutionStateInterface executionState;

private static Flow createScheduleFlow() {
Schedule schedule = Schedule.builder()
.id("hourly")
Expand All @@ -58,6 +61,7 @@ private static Flow createScheduleFlow() {
void schedule() throws Exception {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
SchedulerExecutionStateInterface executionRepositorySpy = spy(this.executionState);
CountDownLatch queueCount = new CountDownLatch(4);

Flow flow = createScheduleFlow();
Expand All @@ -74,6 +78,11 @@ void schedule() throws Exception {
.when(flowListenersServiceSpy)
.flows();

// mock the backfill execution is ended
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
.when(executionRepositorySpy)
.findById(any(), any());

// scheduler
try (AbstractScheduler scheduler = new JdbcScheduler(
applicationContext,
Expand All @@ -94,7 +103,7 @@ void schedule() throws Exception {
});

scheduler.run();
queueCount.await(30, TimeUnit.SECONDS);
queueCount.await(15, TimeUnit.SECONDS);

receive.blockLast();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
@Inject
private SchedulerTriggerStateInterface triggerState;

@Inject
private SchedulerExecutionState schedulerExecutionState;

@Inject
private FlowListeners flowListenersService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
@Inject
protected SchedulerTriggerStateInterface triggerState;

@Inject
protected SchedulerExecutionStateInterface executionState;

@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
protected QueueInterface<LogEntry> logQueue;
Expand Down Expand Up @@ -65,7 +68,7 @@ private ZonedDateTime date(int minus) {
.truncatedTo(ChronoUnit.HOURS);
}

protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy
Expand All @@ -77,6 +80,7 @@ protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
void schedule() throws Exception {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
SchedulerExecutionStateInterface executionStateSpy = spy(this.executionState);
CountDownLatch queueCount = new CountDownLatch(6);
CountDownLatch invalidLogCount = new CountDownLatch(1);
Set<String> date = new HashSet<>();
Expand Down Expand Up @@ -111,7 +115,7 @@ void schedule() throws Exception {
triggerState.create(trigger.toBuilder().triggerId("schedule-invalid").flowId(invalid.getId()).build());

// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionStateSpy)) {
// wait for execution
Flux<Execution> receiveExecutions = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
Expand Down Expand Up @@ -171,7 +175,7 @@ void retroSchedule() throws Exception {
triggerState.create(trigger);

// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();

Await.until(() -> {
Expand Down Expand Up @@ -205,7 +209,7 @@ void recoverALLMissing() throws Exception {
CountDownLatch queueCount = new CountDownLatch(1);

// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
Expand Down Expand Up @@ -250,7 +254,7 @@ void recoverLASTMissing() throws Exception {
CountDownLatch queueCount = new CountDownLatch(1);

// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
Expand Down Expand Up @@ -294,7 +298,7 @@ void recoverNONEMissing() throws Exception {
triggerState.create(lastTrigger);

// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();

Await.until(() -> scheduler.isReady(), Duration.ofMillis(100), Duration.ofSeconds(5));
Expand Down Expand Up @@ -325,7 +329,7 @@ void backfill() throws Exception {
.build();

// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();

Await.until(() -> {
Expand Down Expand Up @@ -390,7 +394,7 @@ void disabled() throws Exception {
triggerState.create(trigger);

// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();

// Wait 3s to see if things happen
Expand Down Expand Up @@ -428,7 +432,7 @@ void stopAfterSchedule() throws Exception {
CountDownLatch queueCount = new CountDownLatch(2);

// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
Expand Down Expand Up @@ -488,7 +492,7 @@ void failedEvaluationTest() {
CountDownLatch queueCount = new CountDownLatch(1);

// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.*;

public class SchedulerThreadTest extends AbstractSchedulerTest {
@Inject
protected FlowListeners flowListenersService;

@Inject
protected SchedulerTriggerStateInterface triggerState;
protected SchedulerExecutionStateInterface executionState;

@Test
void thread() throws Exception {
Expand All @@ -53,12 +53,17 @@ void thread() throws Exception {

// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);

SchedulerExecutionStateInterface schedulerExecutionStateSpy = spy(this.executionState);

doReturn(Collections.singletonList(flow))
.when(flowListenersServiceSpy)
.flows();

// mock the backfill execution is ended
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
.when(schedulerExecutionStateSpy)
.findById(any(), any());

// scheduler
try (
AbstractScheduler scheduler = new JdbcScheduler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import io.kestra.core.runners.FlowListeners;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import io.kestra.core.schedulers.SchedulerScheduleTest;
import io.kestra.jdbc.runner.JdbcScheduler;

class H2SchedulerScheduleTest extends SchedulerScheduleTest {
@Override
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import io.kestra.core.runners.FlowListeners;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import io.kestra.core.schedulers.SchedulerScheduleTest;
import io.kestra.jdbc.runner.JdbcScheduler;

class MysqlSchedulerScheduleTest extends SchedulerScheduleTest {
@Override
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import io.kestra.core.runners.FlowListeners;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import io.kestra.core.schedulers.SchedulerScheduleTest;
import io.kestra.jdbc.runner.JdbcScheduler;

class PostgresSchedulerScheduleTest extends SchedulerScheduleTest {
@Override
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ public Trigger updateExecution(Trigger trigger) {
Trigger current = optionalTrigger.get();
current = current.toBuilder()
.executionId(trigger.getExecutionId())
.executionCurrentState(trigger.getExecutionCurrentState())
.updatedDate(trigger.getUpdatedDate())
.build();
this.save(context, current);
Expand Down
11 changes: 2 additions & 9 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
public class JdbcScheduler extends AbstractScheduler {
private final QueueInterface<Execution> executionQueue;
private final TriggerRepositoryInterface triggerRepository;
private final ConditionService conditionService;

private final FlowRepositoryInterface flowRepository;
private final JooqDSLContextWrapper dslContextWrapper;
private final ConditionService conditionService;


@SuppressWarnings("unchecked")
Expand All @@ -48,6 +48,7 @@ public JdbcScheduler(
executionQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.EXECUTION_NAMED));
triggerRepository = applicationContext.getBean(AbstractJdbcTriggerRepository.class);
triggerState = applicationContext.getBean(SchedulerTriggerStateInterface.class);
executionState = applicationContext.getBean(SchedulerExecutionState.class);
conditionService = applicationContext.getBean(ConditionService.class);
flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
Expand Down Expand Up @@ -75,14 +76,6 @@ public void run() {
.ifPresent(trigger -> {
this.triggerState.update(trigger.resetExecution(execution.getState().getCurrent()));
});
} else {
// update execution state on each state change so the scheduler knows the execution is running
triggerRepository
.findByExecution(execution)
.filter(trigger -> execution.getState().getCurrent() != trigger.getExecutionCurrentState())
.ifPresent(trigger -> {
((JdbcSchedulerTriggerState) this.triggerState).updateExecution(Trigger.of(execution, trigger));
});
}
}
}
Expand Down
Loading

0 comments on commit bdef596

Please sign in to comment.