Skip to content

Commit

Permalink
KOGITO-9659: Workflow timeout not working using external Job Service …
Browse files Browse the repository at this point in the history
…(management-addon) (#1822)
  • Loading branch information
wmedvede committed Aug 4, 2023
1 parent 46d31a6 commit 1faabec
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"id": "workflow_timeouts",
"version": "1.0",
"name": "Workflow Timeouts",
"description": "Workflow Timeouts Test",
"start": "printWaitMessage",
"timeouts": {
"workflowExecTimeout": "PT5S"
},
"events": [
{
"name": "wakeUpEvent",
"source": "",
"type": "workflow_timeouts_wake_up_event_in"
}
],
"functions": [
{
"name": "printMessage",
"type": "custom",
"operation": "sysout"
}
],
"states": [
{
"name": "printWaitMessage",
"type": "operation",
"actions": [
{
"name": "printBeforeEvent",
"functionRef": {
"refName": "printMessage",
"arguments": {
"message": "Waiting for event"
}
}
}
],
"transition": "waitForEvent"
},
{
"name": "waitForEvent",
"type": "event",
"onEvents": [
{
"eventRefs": [
"wakeUpEvent"
],
"actions": []
}
],
"end": true
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.kie.kogito.it.jobs;

import org.junit.jupiter.api.Test;

import static org.kie.kogito.test.utils.ProcessInstancesRESTTestUtils.assertProcessInstanceHasFinished;
import static org.kie.kogito.test.utils.ProcessInstancesRESTTestUtils.newProcessInstanceAndGetId;

public abstract class BaseWorkflowTimeoutsIT {

private static final String EMPTY_WORKFLOW_DATA = "{\"workflowdata\" : \"\"}";

protected static final String WORKFLOW_TIMEOUTS_URL = "/workflow_timeouts";
private static final String WORKFLOW_TIMEOUTS_GET_BY_ID_URL = WORKFLOW_TIMEOUTS_URL + "/{id}";

@Test
void workflowTimeoutExceeded() {
// Start a new process instance.
String processInstanceId = newProcessInstanceAndGetId(WORKFLOW_TIMEOUTS_URL, EMPTY_WORKFLOW_DATA);
// Give enough time for the timeout to exceed.
assertProcessInstanceHasFinished(WORKFLOW_TIMEOUTS_GET_BY_ID_URL, processInstanceId, 1, 180);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.kie.kogito.it.jobs;

import io.quarkus.test.junit.QuarkusIntegrationTest;

@QuarkusIntegrationTest
class WorkflowTimeoutsIT extends BaseWorkflowTimeoutsIT {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.kie.kogito.it.jobs;

import org.kie.kogito.test.resources.JobServiceTestResource;
import org.kie.kogito.testcontainers.quarkus.KafkaQuarkusTestResource;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusIntegrationTest;

@QuarkusIntegrationTest
@QuarkusTestResource(KafkaQuarkusTestResource.class)
@JobServiceTestResource(knativeEventingEnabled = true)
class WorkflowTimeoutsIT extends BaseWorkflowTimeoutsIT {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.kie.kogito.it.jobs;

import org.kie.kogito.test.resources.JobServiceTestResource;
import org.kie.kogito.testcontainers.quarkus.KafkaQuarkusTestResource;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusIntegrationTest;

@QuarkusIntegrationTest
@QuarkusTestResource(KafkaQuarkusTestResource.class)
@JobServiceTestResource
class WorkflowTimeoutsIT extends BaseWorkflowTimeoutsIT {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.kie.kogito.it.jobs;

import org.kie.kogito.test.resources.JobServiceTestResource;

import io.quarkus.test.junit.QuarkusIntegrationTest;

@QuarkusIntegrationTest
@JobServiceTestResource(kafkaEnabled = true)
class WorkflowTimeoutsIT extends BaseWorkflowTimeoutsIT {
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.kie.kogito.jobs.service.exception.InvalidScheduleTimeException;
import org.kie.kogito.jobs.service.exception.JobServiceException;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.kie.kogito.jobs.service.model.JobStatus;
Expand Down Expand Up @@ -125,7 +126,7 @@ private JobDetails jobWithStatusAndHandle(JobDetails job, JobStatus status, Mana

/**
* Performs the given job scheduling process on the scheduler, after all the validations already made.
*
*
* @param job to be scheduled
* @return
*/
Expand Down Expand Up @@ -155,7 +156,7 @@ private PublisherBuilder<JobDetails> doJobScheduling(JobDetails job, boolean exi

/**
* Check if it should be scheduled (on the current chunk) or saved to be scheduled later.
*
*
* @return
*/
private boolean isOnCurrentSchedulerChunk(JobDetails job) {
Expand Down Expand Up @@ -216,10 +217,15 @@ public PublisherBuilder<JobDetails> handleJobExecutionSuccess(JobDetails futureJ

@Override
public PublisherBuilder<JobDetails> handleJobExecutionSuccess(JobExecutionResponse response) {
return ReactiveStreams.of(response)
.map(JobExecutionResponse::getJobId)
.flatMapCompletionStage(jobRepository::get)
.flatMap(this::handleJobExecutionSuccess);
return ReactiveStreams.of(response.getJobId())
.flatMapCompletionStage(this::readJob)
.flatMap(jobDetails -> jobDetails.map(this::handleJobExecutionSuccess)
.orElseThrow(() -> new JobServiceException("Job: " + response.getJobId() + " was not found in database.")));
}

private CompletionStage<Optional<JobDetails>> readJob(String jobId) {
return jobRepository.get(jobId)
.thenCompose(jobDetails -> CompletableFuture.completedFuture(Optional.ofNullable(jobDetails)));
}

private boolean isExpired(ZonedDateTime expirationTime, int retries) {
Expand All @@ -244,7 +250,7 @@ private PublisherBuilder<JobDetails> handleExpirationTime(JobDetails scheduledJo
* between retries and a limit of max interval of {@link BaseTimerJobScheduler#maxIntervalLimitToRetryMillis}
* to retry, after this interval it the job it the job is not successfully executed it will remain in error
* state, with no more retries.
*
*
* @param errorResponse
* @return
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.jobs.service.exception.JobServiceException;
import org.kie.kogito.jobs.service.executor.JobExecutor;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
Expand All @@ -48,11 +49,13 @@
import io.smallrye.mutiny.Uni;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.kie.kogito.jobs.service.model.JobStatus.CANCELED;
import static org.kie.kogito.jobs.service.model.JobStatus.SCHEDULED;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -390,4 +393,43 @@ void testRescheduleAndMerge() {
verify(tested()).doCancel(merged);
verify(tested()).schedule(merged);
}

@Test
void handleJobExecutionSuccess() throws Exception {
scheduledJob = JobDetails.builder().id(JOB_ID).trigger(trigger).status(SCHEDULED).build();
doReturn(CompletableFuture.completedFuture(scheduledJob)).when(jobRepository).get(JOB_ID);
doReturn(CompletableFuture.completedFuture(scheduledJob)).when(jobRepository).delete(any(JobDetails.class));
JobExecutionResponse response = new JobExecutionResponse("execution successful", "200", ZonedDateTime.now(), JOB_ID);

Optional<JobDetails> result = tested().handleJobExecutionSuccess(response)
.findFirst()
.run()
.toCompletableFuture()
.get();

verify(jobRepository).delete(scheduleCaptor.capture());
JobDetails deletedJob = scheduleCaptor.getValue();
assertThat(deletedJob).isNotNull();
assertThat(deletedJob.getId()).isEqualTo(JOB_ID);
assertThat(result).isNotEmpty();
assertThat(result.get().getId()).isEqualTo(JOB_ID);
}

@Test
void handleJobExecutionSuccessJobNotFound() {
scheduledJob = JobDetails.builder().id(JOB_ID).trigger(trigger).status(SCHEDULED).build();
doReturn(CompletableFuture.completedFuture(null)).when(jobRepository).get(JOB_ID);
JobExecutionResponse response = new JobExecutionResponse("execution successful", "200", ZonedDateTime.now(), JOB_ID);

assertThatThrownBy(() -> tested().handleJobExecutionSuccess(response)
.findFirst()
.run()
.toCompletableFuture()
.get())
.hasCauseInstanceOf(JobServiceException.class)
.hasMessageContaining("Job: %s was not found in database.", JOB_ID);
verify(jobRepository, never()).delete(JOB_ID);
verify(jobRepository, never()).delete(any(JobDetails.class));
verify(jobRepository, never()).save(any());
}
}

0 comments on commit 1faabec

Please sign in to comment.