Skip to content

Commit

Permalink
Fix Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Meng committed Apr 15, 2024
1 parent 9700c4d commit 6b581d0
Show file tree
Hide file tree
Showing 51 changed files with 886 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ public class WorkflowExecutor {
@Qualifier(WorkflowExternalBeans.EXECUTOR_ASYNC_LISTENER)
private Executor asyncListener;

@Autowired
@Qualifier(WorkflowExternalBeans.EXECUTOR_ASYNC_START)
private Executor asyncStart;

// end calix

public WorkflowExecutor(
Expand All @@ -121,7 +125,9 @@ public WorkflowExecutor(
SystemTaskRegistry systemTaskRegistry,
ParametersUtils parametersUtils,
IDGenerator idGenerator,
ApplicationEventPublisher eventPublisher) {
ApplicationEventPublisher eventPublisher,
@Qualifier(WorkflowExternalBeans.EXECUTOR_ASYNC_LISTENER) Executor asyncListener,
@Qualifier(WorkflowExternalBeans.EXECUTOR_ASYNC_START) Executor asyncStart) {
this.deciderService = deciderService;
this.metadataDAO = metadataDAO;
this.queueDAO = queueDAO;
Expand All @@ -136,6 +142,8 @@ public WorkflowExecutor(
this.idGenerator = idGenerator;
this.systemTaskRegistry = systemTaskRegistry;
this.eventPublisher = eventPublisher;
this.asyncListener = asyncListener;
this.asyncStart = asyncStart;
}

/**
Expand Down Expand Up @@ -1052,8 +1060,12 @@ public List<String> getRunningWorkflowIds(String workflowName, int version) {
@EventListener(WorkflowEvaluationEvent.class)
public void handleWorkflowEvaluationEvent(WorkflowEvaluationEvent wee) {
// calix
WorkflowModel w = wee.getWorkflowModel();
locked(decideWithLock(w), w.getWorkflowId());
CompletableFuture.runAsync(
() -> {
WorkflowModel w = wee.getWorkflowModel();
locked(decideWithLock(w), w.getWorkflowId());
},
asyncStart);
// end calix
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,10 @@

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
Expand All @@ -33,7 +29,6 @@
import com.netflix.conductor.core.event.WorkflowEvaluationEvent;
import com.netflix.conductor.core.exception.TransientException;
import com.netflix.conductor.core.execution.StartWorkflowInput;
import com.netflix.conductor.core.external.WorkflowExternalBeans;
import com.netflix.conductor.core.metadata.MetadataMapperService;
import com.netflix.conductor.core.utils.IDGenerator;
import com.netflix.conductor.core.utils.ParametersUtils;
Expand All @@ -53,13 +48,6 @@ public class StartWorkflowOperation implements WorkflowOperation<StartWorkflowIn
private final ExecutionLockService executionLockService;
private final ApplicationEventPublisher eventPublisher;

// calix
@Autowired
@Qualifier(WorkflowExternalBeans.EXECUTOR_ASYNC_START)
private Executor asyncStart;

// end calix

public StartWorkflowOperation(
MetadataMapperService metadataMapperService,
IDGenerator idGenerator,
Expand Down Expand Up @@ -208,9 +196,7 @@ private void createAndEvaluate(WorkflowModel workflow) {
executionLockService.releaseLock(workflow.getWorkflowId());
}
// calix
CompletableFuture.runAsync(
() -> eventPublisher.publishEvent(new WorkflowEvaluationEvent(workflow)),
asyncStart);
eventPublisher.publishEvent(new WorkflowEvaluationEvent(workflow));
// end calix

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@ public WorkflowSweeper(
Optional<WorkflowRepairService> workflowRepairService,
ConductorProperties properties,
QueueDAO queueDAO,
ExecutionDAOFacade executionDAOFacade) {
ExecutionDAOFacade executionDAOFacade,
ExecutionLockService lockService) {
this.properties = properties;
this.queueDAO = queueDAO;
this.workflowExecutor = workflowExecutor;
this.executionDAOFacade = executionDAOFacade;
this.workflowRepairService = workflowRepairService.orElse(null);
this.lockService = lockService;
LOGGER.info("WorkflowSweeper initialized.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public ExecutionService(
ConductorProperties properties,
ExternalPayloadStorage externalPayloadStorage,
SystemTaskRegistry systemTaskRegistry,
TaskStatusListener taskStatusListener) {
TaskStatusListener taskStatusListener,
@Qualifier(WorkflowExternalBeans.EXECUTOR_ASYNC_DAO) Executor daoExecutor) {
this.workflowExecutor = workflowExecutor;
this.executionDAOFacade = executionDAOFacade;
this.queueDAO = queueDAO;
Expand All @@ -90,6 +91,7 @@ public ExecutionService(
properties.getTaskExecutionPostponeDuration().getSeconds();
this.systemTaskRegistry = systemTaskRegistry;
this.taskStatusListener = taskStatusListener;
this.daoExecutor = daoExecutor;
}

public Task poll(String taskType, String workerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,11 @@ class AsyncSystemTaskExecutorTest extends Specification {
then:
1 * executionDAOFacade.getTaskModel(task1Id) >> task1
1 * executionDAOFacade.getWorkflowModel(workflowId, subWorkflowTask.isTaskRetrievalRequired()) >> workflow
1 * startWorkflowOperation.execute(*_) >> subWorkflowId
1 * workflowExecutor.getWorkflow(subWorkflowId, false) >> subWorkflow
1 * startWorkflowOperation.start(*_) >> subWorkflow
// calix
// 1 * startWorkflowOperation.execute(*_) >> subWorkflowId
// 1 * workflowExecutor.getWorkflow(subWorkflowId, false) >> subWorkflow
// end calix

// SUB_WORKFLOW is asyncComplete so its removed from the queue
1 * queueDAO.remove(queueName, task1Id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ class DoWhileSpec extends Specification {

when:
def retVal = doWhile.execute(workflowModel, doWhileTaskModel, workflowExecutor)
// calix
Thread.sleep(1000L)
// end calix

then: "verify that return value is true, iteration value is updated in DO_WHILE TaskModel"
retVal
Expand Down Expand Up @@ -102,6 +105,9 @@ class DoWhileSpec extends Specification {

when:
def retVal = doWhile.execute(workflowModel, doWhileTaskModel, workflowExecutor)
// calix
Thread.sleep(1000L)
// end calix

then: "verify that the return value is false, since the iteration is not complete"
!retVal
Expand Down Expand Up @@ -136,6 +142,9 @@ class DoWhileSpec extends Specification {

when:
def retVal = doWhile.execute(workflowModel, doWhileTaskModel, workflowExecutor)
// calix
Thread.sleep(1000L)
// end calix

then: "verify that the return value is true, since the iteration is updated"
retVal
Expand Down Expand Up @@ -178,6 +187,9 @@ class DoWhileSpec extends Specification {

when:
def retVal = doWhile.execute(workflowModel, doWhileTaskModel, workflowExecutor)
// calix
Thread.sleep(1000L)
// end calix

then: "verify that return value is true, status is updated"
retVal
Expand Down Expand Up @@ -220,6 +232,9 @@ class DoWhileSpec extends Specification {

when:
def retVal = doWhile.execute(workflowModel, doWhileTaskModel, workflowExecutor)
// calix
Thread.sleep(1000L)
// end calix

then: "verify that return value is false, since the DO_WHILE task model is not updated"
!retVal
Expand Down Expand Up @@ -260,6 +275,9 @@ class DoWhileSpec extends Specification {

when:
def retVal = doWhile.execute(workflowModel, doWhileTaskModel, workflowExecutor)
// calix
Thread.sleep(1000L)
// end calix

then: "verify that the return value is true, DO_WHILE TaskModel is updated"
retVal
Expand Down Expand Up @@ -301,6 +319,9 @@ class DoWhileSpec extends Specification {

when:
def retVal = doWhile.execute(workflowModel, doWhileTaskModel, workflowExecutor)
// calix
Thread.sleep(1000L)
// end calix

then: "verify that the return value is true since DO_WHILE TaskModel is updated"
retVal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ class StartWorkflowSpec extends Specification {

when:
startWorkflow.start(workflowModel, taskModel, workflowExecutor)
// calix
Thread.sleep(1000L)
// end calix

then:
taskModel.status == FAILED
Expand All @@ -81,6 +84,9 @@ class StartWorkflowSpec extends Specification {

when:
startWorkflow.start(workflowModel, taskModel, workflowExecutor)
// calix
Thread.sleep(1000L)
// end calix

then:
taskModel.status == FAILED
Expand All @@ -90,6 +96,9 @@ class StartWorkflowSpec extends Specification {
def "WorkflowExecutor throws a retryable exception"() {
when:
startWorkflow.start(workflowModel, taskModel, workflowExecutor)
// calix
Thread.sleep(1000L)
// end calix

then:
taskModel.status == SCHEDULED
Expand All @@ -99,6 +108,9 @@ class StartWorkflowSpec extends Specification {
def "WorkflowExecutor throws a NotFoundException"() {
when:
startWorkflow.start(workflowModel, taskModel, workflowExecutor)
// calix
Thread.sleep(1000L)
// end calix

then:
taskModel.status == FAILED
Expand All @@ -109,6 +121,9 @@ class StartWorkflowSpec extends Specification {
def "WorkflowExecutor throws a RuntimeException"() {
when:
startWorkflow.start(workflowModel, taskModel, workflowExecutor)
// calix
Thread.sleep(1000L)
// end calix

then:
taskModel.status == FAILED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class StartWorkflowOperationSpec extends Specification {

when:
def workflowId = startWorkflowOperation.execute(startWorkflowInput)
// calix
Thread.sleep(1000L)
// end calix

then:
workflowId == generatedWorkflowId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import java.time.Duration;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -210,6 +212,7 @@ public void init() {
when(properties.getTaskExecutionPostponeDuration()).thenReturn(Duration.ofSeconds(60));
when(properties.getWorkflowOffsetTimeout()).thenReturn(Duration.ofSeconds(30));

Executor asyncExecutor = Executors.newSingleThreadExecutor();
workflowExecutor =
new WorkflowExecutor(
deciderService,
Expand All @@ -224,7 +227,9 @@ public void init() {
systemTaskRegistry,
parametersUtils,
idGenerator,
eventPublisher);
eventPublisher,
asyncExecutor,
asyncExecutor);
}

@Test
Expand Down Expand Up @@ -381,7 +386,7 @@ public void testQueueFailuresDuringScheduleTask() {

@Test
@SuppressWarnings("unchecked")
public void testCompleteWorkflow() {
public void testCompleteWorkflow() throws Exception {
WorkflowDef def = new WorkflowDef();
def.setName("test");

Expand Down Expand Up @@ -424,10 +429,12 @@ public void testCompleteWorkflow() {
.remove(anyString(), anyString());

workflowExecutor.completeWorkflow(workflow);
Thread.sleep(2000L);
assertEquals(WorkflowModel.Status.COMPLETED, workflow.getStatus());
assertEquals(1, updateWorkflowCalledCounter.get());
assertEquals(0, updateTasksCalledCounter.get());
assertEquals(0, removeQueueEntryCalledCounter.get());
Thread.sleep(2000L);
verify(workflowStatusListener, times(1))
.onWorkflowCompletedIfEnabled(any(WorkflowModel.class));
verify(workflowStatusListener, times(0))
Expand All @@ -436,6 +443,7 @@ public void testCompleteWorkflow() {
def.setWorkflowStatusListenerEnabled(true);
workflow.setStatus(WorkflowModel.Status.RUNNING);
workflowExecutor.completeWorkflow(workflow);
Thread.sleep(2000L);
verify(workflowStatusListener, times(2))
.onWorkflowCompletedIfEnabled(any(WorkflowModel.class));
verify(workflowStatusListener, times(0))
Expand All @@ -444,7 +452,7 @@ public void testCompleteWorkflow() {

@Test
@SuppressWarnings("unchecked")
public void testTerminateWorkflow() {
public void testTerminateWorkflow() throws Exception {
WorkflowDef def = new WorkflowDef();
def.setName("test");

Expand Down Expand Up @@ -499,6 +507,7 @@ public void testTerminateWorkflow() {
def.setWorkflowStatusListenerEnabled(true);
workflow.setStatus(WorkflowModel.Status.RUNNING);
workflowExecutor.completeWorkflow(workflow);
Thread.sleep(2000L);
verify(workflowStatusListener, times(1))
.onWorkflowCompletedIfEnabled(any(WorkflowModel.class));
verify(workflowStatusListener, times(1))
Expand Down
Loading

0 comments on commit 6b581d0

Please sign in to comment.