diff --git a/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-common/src/main/resources/org/acme/sw/workflow-timeouts.sw.json b/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-common/src/main/resources/org/acme/sw/workflow-timeouts.sw.json new file mode 100644 index 0000000000..acbb1d2edd --- /dev/null +++ b/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-common/src/main/resources/org/acme/sw/workflow-timeouts.sw.json @@ -0,0 +1,55 @@ +{ + "id": "workflow_timeouts", + "version": "1.0", + "name": "Workflow Timeouts", + "description": "Workflow Timeouts Test", + "start": "printWaitMessage", + "timeouts": { + "workflowExecTimeout": "PT5S" + }, + "events": [ + { + "name": "wakeUpEvent", + "source": "", + "type": "workflow_timeouts_wake_up_event_in" + } + ], + "functions": [ + { + "name": "printMessage", + "type": "custom", + "operation": "sysout" + } + ], + "states": [ + { + "name": "printWaitMessage", + "type": "operation", + "actions": [ + { + "name": "printBeforeEvent", + "functionRef": { + "refName": "printMessage", + "arguments": { + "message": "Waiting for event" + } + } + } + ], + "transition": "waitForEvent" + }, + { + "name": "waitForEvent", + "type": "event", + "onEvents": [ + { + "eventRefs": [ + "wakeUpEvent" + ], + "actions": [] + } + ], + "end": true + } + ] +} \ No newline at end of file diff --git a/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-common-quarkus/src/test/java/org/kie/kogito/it/jobs/BaseWorkflowTimeoutsIT.java b/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-common-quarkus/src/test/java/org/kie/kogito/it/jobs/BaseWorkflowTimeoutsIT.java new file mode 100644 index 0000000000..a49ee424f1 --- /dev/null +++ b/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-common-quarkus/src/test/java/org/kie/kogito/it/jobs/BaseWorkflowTimeoutsIT.java @@ -0,0 +1,39 @@ +/* + * Copyright 2023 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.kie.kogito.it.jobs; + +import org.junit.jupiter.api.Test; + +import static org.kie.kogito.test.utils.ProcessInstancesRESTTestUtils.assertProcessInstanceHasFinished; +import static org.kie.kogito.test.utils.ProcessInstancesRESTTestUtils.newProcessInstanceAndGetId; + +public abstract class BaseWorkflowTimeoutsIT { + + private static final String EMPTY_WORKFLOW_DATA = "{\"workflowdata\" : \"\"}"; + + protected static final String WORKFLOW_TIMEOUTS_URL = "/workflow_timeouts"; + private static final String WORKFLOW_TIMEOUTS_GET_BY_ID_URL = WORKFLOW_TIMEOUTS_URL + "/{id}"; + + @Test + void workflowTimeoutExceeded() { + // Start a new process instance. + String processInstanceId = newProcessInstanceAndGetId(WORKFLOW_TIMEOUTS_URL, EMPTY_WORKFLOW_DATA); + // Give enough time for the timeout to exceed. + assertProcessInstanceHasFinished(WORKFLOW_TIMEOUTS_GET_BY_ID_URL, processInstanceId, 1, 180); + } + +} diff --git a/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-quarkus-embedded/src/test/java/org/kie/kogito/it/jobs/WorkflowTimeoutsIT.java b/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-quarkus-embedded/src/test/java/org/kie/kogito/it/jobs/WorkflowTimeoutsIT.java new file mode 100644 index 0000000000..5372ea2bcb --- /dev/null +++ b/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-quarkus-embedded/src/test/java/org/kie/kogito/it/jobs/WorkflowTimeoutsIT.java @@ -0,0 +1,23 @@ +/* + * Copyright 2023 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.kie.kogito.it.jobs; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +class WorkflowTimeoutsIT extends BaseWorkflowTimeoutsIT { +} diff --git a/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-quarkus-knative-eventing/src/test/java/org/kie/kogito/it/jobs/WorkflowTimeoutsIT.java b/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-quarkus-knative-eventing/src/test/java/org/kie/kogito/it/jobs/WorkflowTimeoutsIT.java new file mode 100644 index 0000000000..cc42f9c977 --- /dev/null +++ b/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-quarkus-knative-eventing/src/test/java/org/kie/kogito/it/jobs/WorkflowTimeoutsIT.java @@ -0,0 +1,29 @@ +/* + * Copyright 2023 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.kie.kogito.it.jobs; + +import org.kie.kogito.test.resources.JobServiceTestResource; +import org.kie.kogito.testcontainers.quarkus.KafkaQuarkusTestResource; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +@QuarkusTestResource(KafkaQuarkusTestResource.class) +@JobServiceTestResource(knativeEventingEnabled = true) +class WorkflowTimeoutsIT extends BaseWorkflowTimeoutsIT { +} diff --git a/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-quarkus-management/src/test/java/org/kie/kogito/it/jobs/WorkflowTimeoutsIT.java b/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-quarkus-management/src/test/java/org/kie/kogito/it/jobs/WorkflowTimeoutsIT.java new file mode 100644 index 0000000000..9ed0467087 --- /dev/null +++ b/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-quarkus-management/src/test/java/org/kie/kogito/it/jobs/WorkflowTimeoutsIT.java @@ -0,0 +1,29 @@ +/* + * Copyright 2023 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.kie.kogito.it.jobs; + +import org.kie.kogito.test.resources.JobServiceTestResource; +import org.kie.kogito.testcontainers.quarkus.KafkaQuarkusTestResource; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +@QuarkusTestResource(KafkaQuarkusTestResource.class) +@JobServiceTestResource +class WorkflowTimeoutsIT extends BaseWorkflowTimeoutsIT { +} diff --git a/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-quarkus-messaging/src/test/java/org/kie/kogito/it/jobs/WorkflowTimeoutsIT.java b/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-quarkus-messaging/src/test/java/org/kie/kogito/it/jobs/WorkflowTimeoutsIT.java new file mode 100644 index 0000000000..cf3c0e992e --- /dev/null +++ b/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-quarkus-messaging/src/test/java/org/kie/kogito/it/jobs/WorkflowTimeoutsIT.java @@ -0,0 +1,26 @@ +/* + * Copyright 2023 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.kie.kogito.it.jobs; + +import org.kie.kogito.test.resources.JobServiceTestResource; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +@JobServiceTestResource(kafkaEnabled = true) +class WorkflowTimeoutsIT extends BaseWorkflowTimeoutsIT { +} diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java index 4a3e9e6a65..723d12a476 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java @@ -30,6 +30,7 @@ import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; import org.kie.kogito.jobs.service.exception.InvalidScheduleTimeException; +import org.kie.kogito.jobs.service.exception.JobServiceException; import org.kie.kogito.jobs.service.model.JobDetails; import org.kie.kogito.jobs.service.model.JobExecutionResponse; import org.kie.kogito.jobs.service.model.JobStatus; @@ -125,7 +126,7 @@ private JobDetails jobWithStatusAndHandle(JobDetails job, JobStatus status, Mana /** * Performs the given job scheduling process on the scheduler, after all the validations already made. - * + * * @param job to be scheduled * @return */ @@ -155,7 +156,7 @@ private PublisherBuilder doJobScheduling(JobDetails job, boolean exi /** * Check if it should be scheduled (on the current chunk) or saved to be scheduled later. - * + * * @return */ private boolean isOnCurrentSchedulerChunk(JobDetails job) { @@ -216,10 +217,15 @@ public PublisherBuilder handleJobExecutionSuccess(JobDetails futureJ @Override public PublisherBuilder handleJobExecutionSuccess(JobExecutionResponse response) { - return ReactiveStreams.of(response) - .map(JobExecutionResponse::getJobId) - .flatMapCompletionStage(jobRepository::get) - .flatMap(this::handleJobExecutionSuccess); + return ReactiveStreams.of(response.getJobId()) + .flatMapCompletionStage(this::readJob) + .flatMap(jobDetails -> jobDetails.map(this::handleJobExecutionSuccess) + .orElseThrow(() -> new JobServiceException("Job: " + response.getJobId() + " was not found in database."))); + } + + private CompletionStage> readJob(String jobId) { + return jobRepository.get(jobId) + .thenCompose(jobDetails -> CompletableFuture.completedFuture(Optional.ofNullable(jobDetails))); } private boolean isExpired(ZonedDateTime expirationTime, int retries) { @@ -244,7 +250,7 @@ private PublisherBuilder handleExpirationTime(JobDetails scheduledJo * between retries and a limit of max interval of {@link BaseTimerJobScheduler#maxIntervalLimitToRetryMillis} * to retry, after this interval it the job it the job is not successfully executed it will remain in error * state, with no more retries. - * + * * @param errorResponse * @return */ diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobSchedulerTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobSchedulerTest.java index b535e7671a..79ddb5cc45 100644 --- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobSchedulerTest.java +++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobSchedulerTest.java @@ -29,6 +29,7 @@ import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.kie.kogito.jobs.service.exception.JobServiceException; import org.kie.kogito.jobs.service.executor.JobExecutor; import org.kie.kogito.jobs.service.model.JobDetails; import org.kie.kogito.jobs.service.model.JobExecutionResponse; @@ -48,11 +49,13 @@ import io.smallrye.mutiny.Uni; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.kie.kogito.jobs.service.model.JobStatus.CANCELED; import static org.kie.kogito.jobs.service.model.JobStatus.SCHEDULED; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -390,4 +393,43 @@ void testRescheduleAndMerge() { verify(tested()).doCancel(merged); verify(tested()).schedule(merged); } + + @Test + void handleJobExecutionSuccess() throws Exception { + scheduledJob = JobDetails.builder().id(JOB_ID).trigger(trigger).status(SCHEDULED).build(); + doReturn(CompletableFuture.completedFuture(scheduledJob)).when(jobRepository).get(JOB_ID); + doReturn(CompletableFuture.completedFuture(scheduledJob)).when(jobRepository).delete(any(JobDetails.class)); + JobExecutionResponse response = new JobExecutionResponse("execution successful", "200", ZonedDateTime.now(), JOB_ID); + + Optional result = tested().handleJobExecutionSuccess(response) + .findFirst() + .run() + .toCompletableFuture() + .get(); + + verify(jobRepository).delete(scheduleCaptor.capture()); + JobDetails deletedJob = scheduleCaptor.getValue(); + assertThat(deletedJob).isNotNull(); + assertThat(deletedJob.getId()).isEqualTo(JOB_ID); + assertThat(result).isNotEmpty(); + assertThat(result.get().getId()).isEqualTo(JOB_ID); + } + + @Test + void handleJobExecutionSuccessJobNotFound() { + scheduledJob = JobDetails.builder().id(JOB_ID).trigger(trigger).status(SCHEDULED).build(); + doReturn(CompletableFuture.completedFuture(null)).when(jobRepository).get(JOB_ID); + JobExecutionResponse response = new JobExecutionResponse("execution successful", "200", ZonedDateTime.now(), JOB_ID); + + assertThatThrownBy(() -> tested().handleJobExecutionSuccess(response) + .findFirst() + .run() + .toCompletableFuture() + .get()) + .hasCauseInstanceOf(JobServiceException.class) + .hasMessageContaining("Job: %s was not found in database.", JOB_ID); + verify(jobRepository, never()).delete(JOB_ID); + verify(jobRepository, never()).delete(any(JobDetails.class)); + verify(jobRepository, never()).save(any()); + } }