diff --git a/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-common-quarkus/src/main/java/org/kie/kogito/test/resources/JobServiceCompositeQuarkusTestResource.java b/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-common-quarkus/src/main/java/org/kie/kogito/test/resources/JobServiceCompositeQuarkusTestResource.java
index 43fc2dec89..d4d7a6ba33 100644
--- a/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-common-quarkus/src/main/java/org/kie/kogito/test/resources/JobServiceCompositeQuarkusTestResource.java
+++ b/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-common-quarkus/src/main/java/org/kie/kogito/test/resources/JobServiceCompositeQuarkusTestResource.java
@@ -52,7 +52,13 @@ public void init(JobServiceTestResource annotation) {
}
if (annotation.kafkaEnabled()) {
resource.withSharedDependencyContainer("kafka", new KogitoKafkaContainer());
- resource.getServiceContainers(JobServiceContainer.class).forEach(c -> c.addEnv("QUARKUS_PROFILE", "events-support"));
+ resource.getServiceContainers(JobServiceContainer.class).forEach(c -> c.addEnv("QUARKUS_PROFILE", "kafka-events-support"));
+ }
+ if (annotation.knativeEventingEnabled()) {
+ resource.getServiceContainers(JobServiceContainer.class).forEach(c -> {
+ c.addEnv("QUARKUS_PROFILE", "http-events-support");
+ c.addEnv("KOGITO_JOBS_SERVICE_HTTP_JOB_STATUS_CHANGE_EVENTS", "false");
+ });
}
if (annotation.dataIndexEnabled()) {
DataIndexPostgreSqlContainer container = new DataIndexPostgreSqlContainer();
diff --git a/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-quarkus-knative-eventing/src/main/resources/application.properties b/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-quarkus-knative-eventing/src/main/resources/application.properties
index 25294da18e..64f1b8f0d4 100644
--- a/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-quarkus-knative-eventing/src/main/resources/application.properties
+++ b/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-quarkus-knative-eventing/src/main/resources/application.properties
@@ -1,6 +1,6 @@
kogito.service.url=http://localhost:8080
-# Disable the KSinkInjectionHealthCheck since the K_SINK env variable is no passed in this context.
+# Disable the KSinkInjectionHealthCheck since the K_SINK env variable is not passed in this context.
quarkus.smallrye-health.check."org.kie.kogito.addons.quarkus.knative.eventing.KSinkInjectionHealthCheck".enabled=false
quarkus.kogito.devservices.enabled=false
diff --git a/jobs-service/jobs-service-common/pom.xml b/jobs-service/jobs-service-common/pom.xml
index 254266c74d..c479c65ecb 100644
--- a/jobs-service/jobs-service-common/pom.xml
+++ b/jobs-service/jobs-service-common/pom.xml
@@ -69,10 +69,6 @@
io.cloudevents
cloudevents-json-jackson
-
- io.cloudevents
- cloudevents-http-vertx
-
io.quarkus
quarkus-smallrye-openapi
@@ -96,22 +92,6 @@
quarkus-smallrye-reactive-messaging
-
- io.quarkus
- quarkus-smallrye-reactive-messaging-kafka
-
-
-
- io.quarkiverse.reactivemessaging.http
- quarkus-reactive-messaging-http
- ${version.io.quarkiverse.reactivemessaging.http}
-
-
-
- io.smallrye.reactive
- smallrye-mutiny-vertx-kafka-client
-
-
io.smallrye.reactive
smallrye-reactive-messaging-in-memory
@@ -133,12 +113,6 @@
quarkus-smallrye-health
-
-
- io.quarkus
- quarkus-smallrye-fault-tolerance
-
-
org.apache.commons
commons-lang3
diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManager.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManager.java
index 4c54f64610..5108b28719 100644
--- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManager.java
+++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManager.java
@@ -25,10 +25,11 @@
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Event;
import javax.enterprise.event.Observes;
+import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
-import org.eclipse.microprofile.reactive.messaging.spi.Connector;
+import org.kie.kogito.jobs.service.messaging.MessagingHandler;
import org.kie.kogito.jobs.service.model.JobServiceManagementInfo;
import org.kie.kogito.jobs.service.repository.JobServiceManagementRepository;
import org.kie.kogito.jobs.service.utils.DateUtil;
@@ -38,7 +39,6 @@
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Uni;
-import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.vertx.mutiny.core.TimeoutStream;
import io.vertx.mutiny.core.Vertx;
@@ -60,8 +60,7 @@ public class JobServiceInstanceManager {
String leaderManagementId;
@Inject
- @Connector(value = "smallrye-kafka")
- KafkaConnector kafkaConnector;
+ Instance messagingHandlerInstance;
@Inject
Event messagingChangeEventEvent;
@@ -105,8 +104,7 @@ void startup(@Observes StartupEvent startupEvent) {
private void disableCommunication() {
//disable consuming events
- kafkaConnector.getConsumerChannels().stream().forEach(c -> kafkaConnector.getConsumer(c).pause());
-
+ messagingHandlerInstance.stream().forEach(MessagingHandler::pause);
//disable producing events
messagingChangeEventEvent.fire(new MessagingChangeEvent(false));
@@ -115,8 +113,7 @@ private void disableCommunication() {
private void enableCommunication() {
//enable consuming events
- kafkaConnector.getConsumerChannels().stream().forEach(c -> kafkaConnector.getConsumer(c).resume());
-
+ messagingHandlerInstance.stream().forEach(MessagingHandler::resume);
//enable producing events
messagingChangeEventEvent.fire(new MessagingChangeEvent(true));
diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/messaging/MessagingHandler.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/messaging/MessagingHandler.java
new file mode 100644
index 0000000000..b7ba8aacbe
--- /dev/null
+++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/messaging/MessagingHandler.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2023 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kie.kogito.jobs.service.messaging;
+
+public interface MessagingHandler {
+
+ void pause();
+
+ void resume();
+}
diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/reflection/ReflectionConfiguration.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/reflection/ReflectionConfiguration.java
index 363ed2427f..f9fbfe386e 100644
--- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/reflection/ReflectionConfiguration.java
+++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/reflection/ReflectionConfiguration.java
@@ -50,7 +50,8 @@
"org.kie.kogito.jobs.service.api.schedule.cron.CronSchedule",
"org.kie.kogito.jobs.service.api.event.JobCloudEvent",
"org.kie.kogito.jobs.service.api.event.CreateJobEvent",
- "org.kie.kogito.jobs.service.api.event.DeleteJobEvent"
+ "org.kie.kogito.jobs.service.api.event.DeleteJobEvent",
+ "org.kie.kogito.jobs.service.resource.error.ErrorResponse"
})
public class ReflectionConfiguration {
}
diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/AbstractJobStreams.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/AbstractJobStreams.java
index 9c364fa7bc..76e28d18a1 100644
--- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/AbstractJobStreams.java
+++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/AbstractJobStreams.java
@@ -73,12 +73,12 @@ protected void jobStatusChange(JobDetails job) {
}
}
- CompletionStage onAck(JobDetails job) {
+ protected CompletionStage onAck(JobDetails job) {
LOGGER.debug("Job Status change published: {}", job);
return CompletableFuture.completedFuture(null);
}
- CompletionStage onNack(Throwable reason, JobDetails job) {
+ protected CompletionStage onNack(Throwable reason, JobDetails job) {
String msg = String.format("An error was produced while publishing a Job status change for the job: %s", job);
LOGGER.error(msg, reason);
return CompletableFuture.completedFuture(null);
diff --git a/jobs-service/jobs-service-common/src/main/resources/META-INF/microprofile-config.properties b/jobs-service/jobs-service-common/src/main/resources/META-INF/microprofile-config.properties
index db05957cbd..916065faa4 100644
--- a/jobs-service/jobs-service-common/src/main/resources/META-INF/microprofile-config.properties
+++ b/jobs-service/jobs-service-common/src/main/resources/META-INF/microprofile-config.properties
@@ -14,32 +14,32 @@
# limitations under the License.
#
-#Log Config
+# Log Config
quarkus.log.level=INFO
%dev.quarkus.log.category."org.kie.kogito.jobs".level=DEBUG
-##Console
+# Console
quarkus.log.console.format=%d{yyyy-MM-dd HH:mm:ss,SSS} %h %-5p [%c:%L] (%t) %s%e%n
quarkus.log.console.async=true
-#Container image
+# Container image
quarkus.container-image.build=${quarkus.build.image:true}
quarkus.container-image.group=org.kie.kogito
-#Web Config
+# Web Config
quarkus.ssl.native=true
quarkus.http.cors=true
quarkus.http.cors.origins=/.*/
quarkus.http.limits.max-body-size=10M
quarkus.http.port=8080
-#Swagger
+# Swagger
quarkus.swagger-ui.always-include=true
-#OpenAPI document extensions
+# OpenAPI document extensions
mp.openapi.filter=org.kie.kogito.jobs.service.openapi.JobServiceModelFilter
-#Job Service
+# Job Service
kogito.jobs-service.maxIntervalLimitToRetryMillis=60000
kogito.jobs-service.backoffRetryMillis=1000
kogito.service.url=http://localhost:8080
@@ -48,40 +48,6 @@ kogito.jobs-service.loadJobIntervalInMinutes=10
kogito.jobs-service.loadJobFromCurrentTimeIntervalInMinutes=60
kogito.jobs-service.forceExecuteExpiredJobs=true
-#enabling in-memory connector in case kafka is not enabled, to avoid issues with DI
-mp.messaging.outgoing.kogito-job-service-job-status-events.connector=smallrye-in-memory
-
-#Removing beans related to Kafka from CDI when eventing is not enabled
-quarkus.arc.exclude-types=io.smallrye.reactive.messaging.health.*,org.kie.kogito.jobs.service.stream.KafkaConfiguration
-
-#Configure Events Publishing on Job Service using profile
-#disabled by default
-kogito.jobs-service.events-support=false
-kogito.jobs-events-topic=kogito-jobs-events
-
-#enabled with the profile: 'events-support' (-Dquarkus.profile=events-support)
-%events-support.quarkus.kafka.health.enabled=true
-%events-support.kafka.bootstrap.servers=localhost:9092
-%events-support.kogito.jobs-service.events-support=true
-%events-support.mp.messaging.outgoing.kogito-job-service-job-status-events.connector=smallrye-kafka
-#disabled to allow property `kafka.bootstrap.servers` to override
-#%events-support.mp.messaging.outgoing.kogito-job-service-job-status-events.bootstrap.servers=localhost:9092
-%events-support.mp.messaging.outgoing.kogito-job-service-job-status-events.topic=${kogito.jobs-events-topic}
-%events-support.mp.messaging.outgoing.kogito-job-service-job-status-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer
-%events-support.quarkus.arc.exclude-types=
-# Job service event based API configuration for the events-support profile
-# enable/disable the event based API for creating/updating jobs. Please do not disable unless you are completely sure.
-%events-support.kogito.jobs-service.events-api=true
-%events-support.mp.messaging.incoming.kogito-job-service-job-request-events.connector=smallrye-kafka
-%events-support.mp.messaging.incoming.kogito-job-service-job-request-events.enabled=${kogito.jobs-service.events-api}
-%events-support.mp.messaging.incoming.kogito-job-service-job-request-events.topic=kogito-job-service-job-request-events
-%events-support.mp.messaging.incoming.kogito-job-service-job-request-events.value.deserializer=org.kie.kogito.jobs.service.messaging.CloudEventDeserializer
-%events-support.mp.messaging.incoming.kogito-job-service-job-request-events.client.id=kogito-jobs-service
-%events-support.mp.messaging.incoming.kogito-job-service-job-request-events.group.id=kogito-jobs-service
-%events-support.mp.messaging.incoming.kogito-job-service-job-request-events.enable.auto.commit=false
-%events-support.mp.messaging.incoming.kogito-job-service-job-request-events.auto.offset.reset=earliest
-%events-support.mp.messaging.incoming.kogito-job-service-job-request-events.isolation.level=read_committed
-
quarkus.oidc.enabled=true
quarkus.oidc.tenant-enabled=false
@@ -89,7 +55,7 @@ quarkus.oidc.tenant-enabled=false
quarkus.http.auth.permission.authenticated.paths=/*
quarkus.http.auth.permission.authenticated.policy=permit
-#enabled with the profile: 'keycloak' (-Dquarkus.profile=keycloak)
+# enabled with the profile: 'keycloak' (-Dquarkus.profile=keycloak)
%keycloak.quarkus.oidc.enabled=true
%keycloak.quarkus.oidc.tenant-enabled=true
%keycloak.quarkus.oidc.auth-server-url=http://localhost:8280/auth/realms/kogito
@@ -98,70 +64,3 @@ quarkus.http.auth.permission.authenticated.policy=permit
%keycloak.quarkus.http.auth.policy.role-policy1.roles-allowed=confidential
%keycloak.quarkus.http.auth.permission.roles1.paths=/*
%keycloak.quarkus.http.auth.permission.roles1.policy=role-policy1
-
-#Configure Events Publishing and security on Job Service using profile
-#enabled with the profile: 'events-support-auth' (-Dquarkus.profile=events-support-auth)
-%events-support-auth.kogito.jobs-service.events-support=true
-%events-support-auth.quarkus.kafka.health.enabled=true
-%events-support-auth.kafka.bootstrap.servers=localhost:9092
-%events-support-auth.mp.messaging.outgoing.kogito-job-service-job-status-events.connector=smallrye-kafka
-#disabled to allow property `kafka.bootstrap.servers` to override
-#%events-support-auth.mp.messaging.outgoing.kogito-job-service-job-status-events.bootstrap.servers=localhost:9092
-%events-support-auth.mp.messaging.outgoing.kogito-job-service-job-status-events.topic=${kogito.jobs-events-topic}
-%events-support-auth.mp.messaging.outgoing.kogito-job-service-job-status-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer
-%events-support-auth.quarkus.arc.exclude-types=
-# Job service event based API configuration for the events-support-auth profile
-# enable/disable the event based API for creating/updating jobs. Please do not disable unless you are completely sure.
-%events-support-auth.kogito.jobs-service.events-api=true
-%events-support-auth.mp.messaging.incoming.kogito-job-service-job-request-events.connector=smallrye-kafka
-%events-support-auth.mp.messaging.incoming.kogito-job-service-job-request-events.enabled=${kogito.jobs-service.events-api}
-%events-support-auth.mp.messaging.incoming.kogito-job-service-job-request-events.topic=kogito-job-service-job-request-events
-%events-support-auth.mp.messaging.incoming.kogito-job-service-job-request-events.value.deserializer=org.kie.kogito.jobs.service.messaging.CloudEventDeserializer
-%events-support-auth.mp.messaging.incoming.kogito-job-service-job-request-events.client.id=kogito-jobs-service
-%events-support-auth.mp.messaging.incoming.kogito-job-service-job-request-events.group.id=kogito-jobs-service
-%events-support-auth.mp.messaging.incoming.kogito-job-service-job-request-events.enable.auto.commit=false
-%events-support-auth.mp.messaging.incoming.kogito-job-service-job-request-events.auto.offset.reset=earliest
-%events-support-auth.mp.messaging.incoming.kogito-job-service-job-request-events.isolation.level=read_committed
-
-%events-support-auth.quarkus.oidc.enabled=true
-%events-support-auth.quarkus.oidc.tenant-enabled=false
-%events-support-auth.quarkus.oidc.auth-server-url=http://localhost:8280/auth/realms/kogito
-%events-support-auth.quarkus.oidc.client-id=kogito-job-service
-%events-support-auth.quarkus.oidc.credentials.secret=secret
-%events-support-auth.quarkus.http.auth.policy.role-policy1.roles-allowed=confidential
-%events-support-auth.quarkus.http.auth.permission.roles1.paths=/*
-%events-support-auth.quarkus.http.auth.permission.roles1.policy=role-policy1
-
-
-# Incoming connector to receive knative events to create/update/cancel a Job
-mp.messaging.incoming.kogito-job-service-job-request-events-knative.connector=quarkus-http
-mp.messaging.incoming.kogito-job-service-job-request-events-knative.path=/jobs/events
-mp.messaging.incoming.kogito-job-service-job-request-events-knative.method=POST
-
-#Knative V2
-mp.messaging.incoming.kogito-job-service-job-request-events-knative-v2.connector=quarkus-http
-mp.messaging.incoming.kogito-job-service-job-request-events-knative-v2.path=/v2/jobs/events
-mp.messaging.incoming.kogito-job-service-job-request-events-knative-v2.method=POST
-
-#Kafka V2
-%events-support.mp.messaging.incoming.kogito-job-service-job-request-events-v2.connector=smallrye-kafka
-%events-support.mp.messaging.incoming.kogito-job-service-job-request-events-v2.enabled=${kogito.jobs-service.events-api}
-%events-support.mp.messaging.incoming.kogito-job-service-job-request-events-v2.topic=kogito-job-service-job-request-events-v2
-%events-support.mp.messaging.incoming.kogito-job-service-job-request-events-v2.value.deserializer=org.kie.kogito.jobs.service.messaging.CloudEventDeserializer
-%events-support.mp.messaging.incoming.kogito-job-service-job-request-events-v2.client.id=kogito-jobs-service-v2
-%events-support.mp.messaging.incoming.kogito-job-service-job-request-events-v2.group.id=kogito-jobs-service
-%events-support.mp.messaging.incoming.kogito-job-service-job-request-events-v2.enable.auto.commit=false
-%events-support.mp.messaging.incoming.kogito-job-service-job-request-events-v2.auto.offset.reset=earliest
-%events-support.mp.messaging.incoming.kogito-job-service-job-request-events-v2.isolation.level=read_committed
-
-# Set to true to enable the sending of knative events to notify the Job Status changes
-kogito.jobs-service.knative-events=false
-
-# Enable the K_SINK environment variable check
-quarkus.smallrye-health.check."org.kie.kogito.jobs.service.health.knative.KSinkInjectionHealthCheck".enabled=${kogito.jobs-service.knative-events:false}
-
-# Outgoing connector to send Job status change knative events
-mp.messaging.outgoing.kogito-job-service-job-status-events-knative.connector=quarkus-http
-mp.messaging.outgoing.kogito-job-service-job-status-events-knative.url=${K_SINK:http://localhost:8180/jobs}
-mp.messaging.outgoing.kogito-job-service-job-status-events-knative.method=POST
-
diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManagerTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManagerTest.java
index dd0089524f..5fed08fd87 100644
--- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManagerTest.java
+++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManagerTest.java
@@ -16,13 +16,17 @@
package org.kie.kogito.jobs.service.management;
import java.time.OffsetDateTime;
+import java.util.Arrays;
import java.util.function.Function;
+import java.util.stream.Stream;
import javax.enterprise.event.Event;
+import javax.enterprise.inject.Instance;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.kie.kogito.jobs.service.messaging.MessagingHandler;
import org.kie.kogito.jobs.service.model.JobServiceManagementInfo;
import org.kie.kogito.jobs.service.repository.JobServiceManagementRepository;
import org.kie.kogito.jobs.service.repository.impl.DefaultJobServiceManagementRepository;
@@ -35,19 +39,20 @@
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
-import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.vertx.mutiny.core.TimeoutStream;
import io.vertx.mutiny.core.Vertx;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ExtendWith(MockitoExtension.class)
-public class JobServiceInstanceManagerTest {
+class JobServiceInstanceManagerTest {
@InjectMocks
@Spy
@@ -60,7 +65,9 @@ public class JobServiceInstanceManagerTest {
JobServiceManagementRepository repository = new DefaultJobServiceManagementRepository();
@Mock
- KafkaConnector kafkaConnector;
+ Instance messagingHandlerInstance;
+
+ private MessagingHandler messagingHandler;
@Mock
Event messagingChangeEventEvent;
@@ -79,6 +86,9 @@ void setUp() {
tested.heartbeatExpirationInSeconds = 1;
tested.leaderCheckIntervalInSeconds = 1;
tested.heardBeatIntervalInSeconds = 1;
+ messagingHandler = mock(MessagingHandler.class);
+ Stream handlers = Arrays.stream(new MessagingHandler[] { messagingHandler });
+ lenient().doReturn(handlers).when(messagingHandlerInstance).stream();
}
@Test
@@ -112,6 +122,8 @@ void tryBecomeLeaderSuccess() {
tested.tryBecomeLeader(info, checkLeader, heartbeat).await().indefinitely();
verify(repository).getAndUpdate(anyString(), updateFunction.capture());
assertThat(tested.isLeader()).isTrue();
+ verify(messagingHandler).resume();
+ verify(messagingChangeEventEvent).fire(any());
}
@Test
diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/messaging/BaseEventsSupportTestProfile.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/messaging/BaseEventsSupportTestProfile.java
index e8db6d8237..a96c79a4d5 100644
--- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/messaging/BaseEventsSupportTestProfile.java
+++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/messaging/BaseEventsSupportTestProfile.java
@@ -27,7 +27,7 @@ public abstract class BaseEventsSupportTestProfile implements QuarkusTestProfile
@Override
public String getConfigProfile() {
- return "events-support";
+ return "kafka-events-support";
}
@Override
diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/messaging/v2/MessagingEventConsumerTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/messaging/v2/MessagingEventConsumerTest.java
index 1e1b19b425..55cff83b45 100644
--- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/messaging/v2/MessagingEventConsumerTest.java
+++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/messaging/v2/MessagingEventConsumerTest.java
@@ -31,7 +31,7 @@
import io.cloudevents.CloudEvent;
@ExtendWith(MockitoExtension.class)
-abstract class MessagingEventConsumerTest extends ReactiveMessagingEventConsumerTest {
+public abstract class MessagingEventConsumerTest extends ReactiveMessagingEventConsumerTest {
@Override
public CloudEvent newCreateProcessInstanceJobRequestCloudEvent() {
diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/AbstractJobStreamsTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/AbstractJobStreamsTest.java
index 81ffa59705..1ded9d564f 100644
--- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/AbstractJobStreamsTest.java
+++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/AbstractJobStreamsTest.java
@@ -49,7 +49,7 @@
import static org.mockito.Mockito.verify;
@ExtendWith(MockitoExtension.class)
-abstract class AbstractJobStreamsTest {
+public abstract class AbstractJobStreamsTest {
protected static final String URL = "http://localhost:8180";
private static final String SERIALIZED_MESSAGE = "SERIALIZED_MESSAGE";
@@ -66,16 +66,16 @@ abstract class AbstractJobStreamsTest {
private static final Recipient RECIPIENT = new RecipientInstance(HttpRecipient.builder().forStringPayload().url("http://recipient").build());
private static final Trigger TRIGGER = new PointInTimeTrigger();
@Captor
- ArgumentCaptor> messageCaptor;
+ protected ArgumentCaptor> messageCaptor;
@Mock
- ObjectMapper objectMapper;
+ protected ObjectMapper objectMapper;
@Captor
- ArgumentCaptor
org.kie.kogito
- kogito-timer
+ jobs-service-messaging-kafka
-
- io.quarkus
- quarkus-smallrye-reactive-messaging
+ org.kie.kogito
+ jobs-service-messaging-http
+
+
+ org.kie.kogito
+ kogito-timer
org.apache.commons
commons-lang3
-
io.quarkus
quarkus-infinispan-client
@@ -46,6 +48,7 @@
kogito-addons-quarkus-persistence-infinispan-health
+
org.kie.kogito
jobs-service-common
diff --git a/jobs-service/jobs-service-infinispan/src/test/resources/application.properties b/jobs-service/jobs-service-infinispan/src/test/resources/application.properties
index fa862e2111..19bd0c8415 100644
--- a/jobs-service/jobs-service-infinispan/src/test/resources/application.properties
+++ b/jobs-service/jobs-service-infinispan/src/test/resources/application.properties
@@ -1,6 +1,3 @@
-#enabling in-memory connector in case kafka is not enabled, to avoid issues with DI
-mp.messaging.outgoing.kogito-job-service-job-status-events.connector=smallrye-in-memory
-
kogito.jobs-service.maxIntervalLimitToRetryMillis=60000
kogito.jobs-service.backoffRetryMillis=1000
kogito.service.url=http://localhost:8080
@@ -8,7 +5,6 @@ kogito.jobs-service.schedulerChunkInMinutes=10
kogito.jobs-service.loadJobIntervalInMinutes=10
kogito.jobs-service.loadJobFromCurrentTimeIntervalInMinutes=0
kogito.jobs-service.forceExecuteExpiredJobs=false
-kogito.jobs-events-topic=kogito-jobs-events
# Keycloak oidc
quarkus.oidc.enabled=true
@@ -19,6 +15,6 @@ quarkus.oidc.tenant-enabled=false
%keycloak.quarkus.oidc.credentials.secret=secret
# Outgoing events for the event based API integration tests
-%events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.connector=smallrye-kafka
-%events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.topic=kogito-job-service-job-request-events
-%events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.value.serializer=org.apache.kafka.common.serialization.StringSerializer
+%kafka-events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.connector=smallrye-kafka
+%kafka-events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.topic=kogito-job-service-job-request-events
+%kafka-events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.value.serializer=org.apache.kafka.common.serialization.StringSerializer
diff --git a/jobs-service/jobs-service-inmemory/pom.xml b/jobs-service/jobs-service-inmemory/pom.xml
index 2a88a7e8c3..00af80d7f7 100644
--- a/jobs-service/jobs-service-inmemory/pom.xml
+++ b/jobs-service/jobs-service-inmemory/pom.xml
@@ -18,11 +18,20 @@
org.kie.kogito
jobs-service-postgresql-common
+
+ org.kie.kogito
+ jobs-service-messaging-kafka
+
+
+ org.kie.kogito
+ jobs-service-messaging-http
+
io.quarkiverse.embedded.postgresql
quarkus-embedded-postgresql
+
org.kie.kogito
jobs-service-common
diff --git a/jobs-service/jobs-service-inmemory/src/test/resources/application.properties b/jobs-service/jobs-service-inmemory/src/test/resources/application.properties
index e339794754..32a1b76510 100644
--- a/jobs-service/jobs-service-inmemory/src/test/resources/application.properties
+++ b/jobs-service/jobs-service-inmemory/src/test/resources/application.properties
@@ -1,9 +1,8 @@
quarkus.http.test-port=0
# Outgoing events for the event based API integration tests
-%events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.connector=smallrye-kafka
-%events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.topic=kogito-job-service-job-request-events
-%events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.value.serializer=org.apache.kafka.common.serialization.StringSerializer
+%kafka-events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.connector=smallrye-kafka
+%kafka-events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.topic=kogito-job-service-job-request-events
+%kafka-events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.value.serializer=org.apache.kafka.common.serialization.StringSerializer
-quarkus.datasource.db-kind=postgresql
quarkus.datasource.devservices.enabled=false
quarkus.kogito.devservices.enabled=false
\ No newline at end of file
diff --git a/jobs-service/jobs-service-messaging-http/pom.xml b/jobs-service/jobs-service-messaging-http/pom.xml
new file mode 100644
index 0000000000..f3f761cdf3
--- /dev/null
+++ b/jobs-service/jobs-service-messaging-http/pom.xml
@@ -0,0 +1,87 @@
+
+
+
+ jobs-service
+ org.kie.kogito
+ 2.0.0-SNAPSHOT
+
+ 4.0.0
+
+ jobs-service-messaging-http
+ Kogito Apps :: Jobs Service :: Messaging Http
+
+
+
+ org.kie.kogito
+ jobs-service-common
+
+
+
+ io.cloudevents
+ cloudevents-json-jackson
+
+
+ io.cloudevents
+ cloudevents-http-vertx
+
+
+
+ io.quarkiverse.reactivemessaging.http
+ quarkus-reactive-messaging-http
+ ${version.io.quarkiverse.reactivemessaging.http}
+
+
+
+
+ io.quarkus
+ quarkus-smallrye-health
+
+
+
+
+ io.quarkus
+ quarkus-smallrye-fault-tolerance
+
+
+
+
+ org.kie.kogito
+ jobs-service-common
+ test-jar
+ test
+
+
+ org.kie.kogito
+ kogito-quarkus-test-utils
+ test
+
+
+ io.quarkus
+ quarkus-junit5
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+ org.mockito
+ mockito-junit-jupiter
+ test
+
+
+ org.awaitility
+ awaitility
+ test
+
+
+
+
\ No newline at end of file
diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/messaging/CloudEventConverter.java b/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/CloudEventConverter.java
similarity index 98%
rename from jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/messaging/CloudEventConverter.java
rename to jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/CloudEventConverter.java
index dd32469c37..0aed5951d1 100644
--- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/messaging/CloudEventConverter.java
+++ b/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/CloudEventConverter.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.kie.kogito.jobs.service.messaging;
+package org.kie.kogito.jobs.service.messaging.http;
import java.lang.reflect.Type;
diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/messaging/KnativeReactiveMessagingEventConsumer.java b/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/HttpReactiveMessagingEventConsumer.java
similarity index 81%
rename from jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/messaging/KnativeReactiveMessagingEventConsumer.java
rename to jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/HttpReactiveMessagingEventConsumer.java
index 270cc9476e..e16e2cbb6a 100644
--- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/messaging/KnativeReactiveMessagingEventConsumer.java
+++ b/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/HttpReactiveMessagingEventConsumer.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package org.kie.kogito.jobs.service.messaging;
+package org.kie.kogito.jobs.service.messaging.http;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
@@ -23,6 +23,7 @@
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
+import org.kie.kogito.jobs.service.messaging.MessagingConsumer;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.scheduler.impl.TimerDelegateJobScheduler;
@@ -32,18 +33,18 @@
import io.smallrye.mutiny.Uni;
@ApplicationScoped
-public class KnativeReactiveMessagingEventConsumer extends MessagingConsumer {
+public class HttpReactiveMessagingEventConsumer extends MessagingConsumer {
- private static final String KOGITO_JOB_SERVICE_JOB_REQUEST_EVENTS_KNATIVE = "kogito-job-service-job-request-events-knative";
+ private static final String KOGITO_JOB_SERVICE_JOB_REQUEST_EVENTS_HTTP = "kogito-job-service-job-request-events-http";
@Inject
- public KnativeReactiveMessagingEventConsumer(TimerDelegateJobScheduler scheduler,
+ public HttpReactiveMessagingEventConsumer(TimerDelegateJobScheduler scheduler,
ReactiveJobRepository jobRepository,
ObjectMapper objectMapper) {
super(scheduler, jobRepository, objectMapper);
}
- @Incoming(KOGITO_JOB_SERVICE_JOB_REQUEST_EVENTS_KNATIVE)
+ @Incoming(KOGITO_JOB_SERVICE_JOB_REQUEST_EVENTS_HTTP)
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
@Retry(delay = 500, maxRetries = 4)
@Override
diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/health/knative/KSinkInjectionHealthCheck.java b/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/health/knative/KSinkInjectionHealthCheck.java
similarity index 98%
rename from jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/health/knative/KSinkInjectionHealthCheck.java
rename to jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/health/knative/KSinkInjectionHealthCheck.java
index 04aae3f391..f0ac31a7a3 100644
--- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/health/knative/KSinkInjectionHealthCheck.java
+++ b/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/health/knative/KSinkInjectionHealthCheck.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package org.kie.kogito.jobs.service.health.knative;
+package org.kie.kogito.jobs.service.messaging.http.health.knative;
import java.net.InetAddress;
import java.net.URI;
diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/KnativeJobStreams.java b/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreams.java
similarity index 78%
rename from jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/KnativeJobStreams.java
rename to jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreams.java
index 6e28819e2e..33e5758855 100644
--- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/KnativeJobStreams.java
+++ b/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreams.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package org.kie.kogito.jobs.service.stream;
+package org.kie.kogito.jobs.service.messaging.http.stream;
import java.util.Optional;
import java.util.function.Supplier;
@@ -31,6 +31,8 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import org.kie.kogito.jobs.service.model.JobDetails;
+import org.kie.kogito.jobs.service.stream.AbstractJobStreams;
+import org.kie.kogito.jobs.service.stream.AvailableStreams;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -38,10 +40,10 @@
import io.quarkus.reactivemessaging.http.runtime.OutgoingHttpMetadata;
@ApplicationScoped
-public class KnativeJobStreams extends AbstractJobStreams {
+public class HttpJobStreams extends AbstractJobStreams {
- public static final String PUBLISH_EVENTS_CONFIG_KEY = "kogito.jobs-service.knative-events";
- public static final String JOB_STATUS_CHANGE_EVENTS_KNATIVE = "kogito-job-service-job-status-events-knative";
+ public static final String PUBLISH_EVENTS_CONFIG_KEY = "kogito.jobs-service.http.job-status-change-events";
+ public static final String JOB_STATUS_CHANGE_EVENTS_HTTP = "kogito-job-service-job-status-events-http";
/**
* Metadata to include the content-type for structured CloudEvents messages
@@ -51,11 +53,11 @@ public class KnativeJobStreams extends AbstractJobStreams {
.build();
@Inject
- public KnativeJobStreams(ObjectMapper objectMapper,
- @ConfigProperty(name = PUBLISH_EVENTS_CONFIG_KEY) Optional config,
- @Channel(JOB_STATUS_CHANGE_EVENTS_KNATIVE) @OnOverflow(value = OnOverflow.Strategy.LATEST) Emitter emitter,
+ public HttpJobStreams(ObjectMapper objectMapper,
+ @ConfigProperty(name = PUBLISH_EVENTS_CONFIG_KEY) Optional config,
+ @Channel(JOB_STATUS_CHANGE_EVENTS_HTTP) @OnOverflow(value = OnOverflow.Strategy.LATEST) Emitter emitter,
@ConfigProperty(name = "kogito.service.url", defaultValue = "http://localhost:8080") String url) {
- super(objectMapper, config.map(Boolean::valueOf).filter(Boolean.TRUE::equals).orElse(false), emitter, url);
+ super(objectMapper, config.orElse(false), emitter, url);
}
@Incoming(AvailableStreams.JOB_STATUS_CHANGE_EVENTS)
diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/messaging/v2/KnativeReactiveMessagingEventConsumer.java b/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/v2/HttpReactiveMessagingEventConsumer.java
similarity index 78%
rename from jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/messaging/v2/KnativeReactiveMessagingEventConsumer.java
rename to jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/v2/HttpReactiveMessagingEventConsumer.java
index 72031c6979..9ec17fda78 100644
--- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/messaging/v2/KnativeReactiveMessagingEventConsumer.java
+++ b/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/v2/HttpReactiveMessagingEventConsumer.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.kie.kogito.jobs.service.messaging.v2;
+package org.kie.kogito.jobs.service.messaging.http.v2;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
@@ -22,6 +22,7 @@
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
+import org.kie.kogito.jobs.service.messaging.v2.MessagingConsumer;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.scheduler.impl.TimerDelegateJobScheduler;
@@ -31,17 +32,17 @@
import io.smallrye.mutiny.Uni;
@ApplicationScoped
-public class KnativeReactiveMessagingEventConsumer extends MessagingConsumer {
+public class HttpReactiveMessagingEventConsumer extends MessagingConsumer {
- private static final String KOGITO_JOB_SERVICE_JOB_REQUEST_EVENTS_KNATIVE_V2 = "kogito-job-service-job-request-events-knative-v2";
+ private static final String KOGITO_JOB_SERVICE_JOB_REQUEST_EVENTS_HTTP_V2 = "kogito-job-service-job-request-events-http-v2";
@Inject
- public KnativeReactiveMessagingEventConsumer(TimerDelegateJobScheduler scheduler, ReactiveJobRepository jobRepository,
+ public HttpReactiveMessagingEventConsumer(TimerDelegateJobScheduler scheduler, ReactiveJobRepository jobRepository,
ObjectMapper objectMapper) {
super(scheduler, jobRepository, objectMapper);
}
- @Incoming(KOGITO_JOB_SERVICE_JOB_REQUEST_EVENTS_KNATIVE_V2)
+ @Incoming(KOGITO_JOB_SERVICE_JOB_REQUEST_EVENTS_HTTP_V2)
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
@Retry(delay = 500, maxRetries = 4)
@Override
diff --git a/jobs-service/jobs-service-messaging-http/src/main/resources/META-INF/beans.xml b/jobs-service/jobs-service-messaging-http/src/main/resources/META-INF/beans.xml
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/jobs-service/jobs-service-messaging-http/src/main/resources/META-INF/microprofile-config.properties b/jobs-service/jobs-service-messaging-http/src/main/resources/META-INF/microprofile-config.properties
new file mode 100644
index 0000000000..ef288a15fa
--- /dev/null
+++ b/jobs-service/jobs-service-messaging-http/src/main/resources/META-INF/microprofile-config.properties
@@ -0,0 +1,45 @@
+#
+# Copyright 2021 Red Hat, Inc. and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file contains the default configurations for the Quarkus http connector.
+
+# Job status change events notification disabled by default
+kogito.jobs-service.http.job-status-change-events=false
+
+# Job status change events are automatically enabled with the profile: http-events-support (-Dquarkus.profile=http-events-support)
+%http-events-support.kogito.jobs-service.http.job-status-change-events=true
+
+# Unlike the smallrye-kafka connector, quarkus-http connector doesn't have a way to disable an incoming connector at runtime.
+# So by now, similar to REST API, the http eventing API is always enabled.
+# Future work might introduce a http-events-support.kogito.jobs-service.http.events-api=true/false property do disable http eventing API dynamically.
+
+# Incoming connector to receive knative events via http to create/update/cancel a Job
+mp.messaging.incoming.kogito-job-service-job-request-events-http.connector=quarkus-http
+mp.messaging.incoming.kogito-job-service-job-request-events-http.path=/jobs/events
+mp.messaging.incoming.kogito-job-service-job-request-events-http.method=POST
+
+# Knative V2
+mp.messaging.incoming.kogito-job-service-job-request-events-http-v2.connector=quarkus-http
+mp.messaging.incoming.kogito-job-service-job-request-events-http-v2.path=/v2/jobs/events
+mp.messaging.incoming.kogito-job-service-job-request-events-http-v2.method=POST
+
+# Enable the K_SINK environment variable check
+quarkus.smallrye-health.check."org.kie.kogito.jobs.service.messaging.http.health.knative.KSinkInjectionHealthCheck".enabled=${kogito.jobs-service.http.job-status-change-events:false}
+
+# Outgoing connector to send Job status change knative events via http
+mp.messaging.outgoing.kogito-job-service-job-status-events-http.connector=quarkus-http
+mp.messaging.outgoing.kogito-job-service-job-status-events-http.url=${K_SINK:http://localhost:8180/jobs}
+mp.messaging.outgoing.kogito-job-service-job-status-events-http.method=POST
diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/messaging/CloudEventConverterTest.java b/jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/CloudEventConverterTest.java
similarity index 97%
rename from jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/messaging/CloudEventConverterTest.java
rename to jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/CloudEventConverterTest.java
index a34d72e06d..6105af82fc 100644
--- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/messaging/CloudEventConverterTest.java
+++ b/jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/CloudEventConverterTest.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package org.kie.kogito.jobs.service.messaging;
+package org.kie.kogito.jobs.service.messaging.http;
import java.net.URI;
import java.time.OffsetDateTime;
@@ -51,8 +51,8 @@
@ExtendWith(MockitoExtension.class)
class CloudEventConverterTest {
- private static final String STRUCTURED_CLOUD_EVENT = "org/kie/kogito/jobs/service/messaging/StructuredCloudEvent.json";
- private static final String BINARY_CLOUD_EVENT_DATA = "org/kie/kogito/jobs/service/messaging/BinaryCloudEventData.json";
+ private static final String STRUCTURED_CLOUD_EVENT = "org/kie/kogito/jobs/service/messaging/http/StructuredCloudEvent.json";
+ private static final String BINARY_CLOUD_EVENT_DATA = "org/kie/kogito/jobs/service/messaging/http/BinaryCloudEventData.json";
private static final String EVENT_ID = "ID";
private static final URI EVENT_SOURCE = URI.create("http://my_event_source");
diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/messaging/KnativeReactiveMessagingEventConsumerTest.java b/jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/HttpReactiveMessagingEventConsumerTest.java
similarity index 63%
rename from jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/messaging/KnativeReactiveMessagingEventConsumerTest.java
rename to jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/HttpReactiveMessagingEventConsumerTest.java
index cfb29c66fa..91770d9af6 100644
--- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/messaging/KnativeReactiveMessagingEventConsumerTest.java
+++ b/jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/HttpReactiveMessagingEventConsumerTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2022 Red Hat, Inc. and/or its affiliates.
+ * Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,19 +14,20 @@
* limitations under the License.
*/
-package org.kie.kogito.jobs.service.messaging;
+package org.kie.kogito.jobs.service.messaging.http;
+import org.kie.kogito.jobs.service.messaging.ReactiveMessagingEventConsumerTest;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.scheduler.impl.TimerDelegateJobScheduler;
import com.fasterxml.jackson.databind.ObjectMapper;
-class KnativeReactiveMessagingEventConsumerTest extends ReactiveMessagingEventConsumerTest {
+class HttpReactiveMessagingEventConsumerTest extends ReactiveMessagingEventConsumerTest {
@Override
- protected KnativeReactiveMessagingEventConsumer createEventConsumer(TimerDelegateJobScheduler scheduler,
+ protected HttpReactiveMessagingEventConsumer createEventConsumer(TimerDelegateJobScheduler scheduler,
ReactiveJobRepository jobRepository,
ObjectMapper objectMapper) {
- return new KnativeReactiveMessagingEventConsumer(scheduler, jobRepository, objectMapper);
+ return new HttpReactiveMessagingEventConsumer(scheduler, jobRepository, objectMapper);
}
}
diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/health/knative/KSinkInjectionHealthCheckTest.java b/jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/health/knative/KSinkInjectionHealthCheckTest.java
similarity index 93%
rename from jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/health/knative/KSinkInjectionHealthCheckTest.java
rename to jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/health/knative/KSinkInjectionHealthCheckTest.java
index fa6ed44d16..0193234312 100644
--- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/health/knative/KSinkInjectionHealthCheckTest.java
+++ b/jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/health/knative/KSinkInjectionHealthCheckTest.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package org.kie.kogito.jobs.service.health.knative;
+package org.kie.kogito.jobs.service.messaging.http.health.knative;
import java.util.function.UnaryOperator;
@@ -26,7 +26,7 @@
import org.mockito.junit.jupiter.MockitoExtension;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.kie.kogito.jobs.service.health.knative.KSinkInjectionHealthCheck.K_SINK;
+import static org.kie.kogito.jobs.service.messaging.http.health.knative.KSinkInjectionHealthCheck.K_SINK;
import static org.mockito.Mockito.doReturn;
@ExtendWith(MockitoExtension.class)
diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/KnativeJobStreamsTest.java b/jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreamsTest.java
similarity index 79%
rename from jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/KnativeJobStreamsTest.java
rename to jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreamsTest.java
index 796f87bf26..e778e6f992 100644
--- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/KnativeJobStreamsTest.java
+++ b/jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreamsTest.java
@@ -14,24 +14,25 @@
* limitations under the License.
*/
-package org.kie.kogito.jobs.service.stream;
+package org.kie.kogito.jobs.service.messaging.http.stream;
import java.util.Optional;
import javax.ws.rs.core.HttpHeaders;
import org.eclipse.microprofile.reactive.messaging.Message;
+import org.kie.kogito.jobs.service.stream.AbstractJobStreamsTest;
import io.cloudevents.jackson.JsonFormat;
import io.quarkus.reactivemessaging.http.runtime.OutgoingHttpMetadata;
import static org.assertj.core.api.Assertions.assertThat;
-class KnativeJobStreamsTest extends AbstractJobStreamsTest {
+class HttpJobStreamsTest extends AbstractJobStreamsTest {
@Override
- protected KnativeJobStreams createJobStreams() {
- return new KnativeJobStreams(objectMapper, Optional.of("true"), emitter, URL);
+ protected HttpJobStreams createJobStreams() {
+ return new HttpJobStreams(objectMapper, Optional.of(true), emitter, AbstractJobStreamsTest.URL);
}
@Override
diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/messaging/v2/KnativeReactiveMessagingEventConsumerTest.java b/jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/v2/HttpReactiveMessagingEventConsumerTest.java
similarity index 67%
rename from jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/messaging/v2/KnativeReactiveMessagingEventConsumerTest.java
rename to jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/v2/HttpReactiveMessagingEventConsumerTest.java
index 58b9201407..0cfd470a3e 100644
--- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/messaging/v2/KnativeReactiveMessagingEventConsumerTest.java
+++ b/jobs-service/jobs-service-messaging-http/src/test/java/org/kie/kogito/jobs/service/messaging/http/v2/HttpReactiveMessagingEventConsumerTest.java
@@ -13,19 +13,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.kie.kogito.jobs.service.messaging.v2;
+package org.kie.kogito.jobs.service.messaging.http.v2;
+import org.kie.kogito.jobs.service.messaging.v2.MessagingEventConsumerTest;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.scheduler.impl.TimerDelegateJobScheduler;
import com.fasterxml.jackson.databind.ObjectMapper;
-class KnativeReactiveMessagingEventConsumerTest extends MessagingEventConsumerTest {
+class HttpReactiveMessagingEventConsumerTest extends MessagingEventConsumerTest {
@Override
- protected KnativeReactiveMessagingEventConsumer createEventConsumer(TimerDelegateJobScheduler scheduler,
+ protected HttpReactiveMessagingEventConsumer createEventConsumer(TimerDelegateJobScheduler scheduler,
ReactiveJobRepository jobRepository,
ObjectMapper objectMapper) {
- return new KnativeReactiveMessagingEventConsumer(scheduler, jobRepository, objectMapper);
+ return new HttpReactiveMessagingEventConsumer(scheduler, jobRepository, objectMapper);
}
}
diff --git a/jobs-service/jobs-service-common/src/test/resources/org/kie/kogito/jobs/service/messaging/BinaryCloudEventData.json b/jobs-service/jobs-service-messaging-http/src/test/resources/org/kie/kogito/jobs/service/messaging/http/BinaryCloudEventData.json
similarity index 100%
rename from jobs-service/jobs-service-common/src/test/resources/org/kie/kogito/jobs/service/messaging/BinaryCloudEventData.json
rename to jobs-service/jobs-service-messaging-http/src/test/resources/org/kie/kogito/jobs/service/messaging/http/BinaryCloudEventData.json
diff --git a/jobs-service/jobs-service-common/src/test/resources/org/kie/kogito/jobs/service/messaging/StructuredCloudEvent.json b/jobs-service/jobs-service-messaging-http/src/test/resources/org/kie/kogito/jobs/service/messaging/http/StructuredCloudEvent.json
similarity index 100%
rename from jobs-service/jobs-service-common/src/test/resources/org/kie/kogito/jobs/service/messaging/StructuredCloudEvent.json
rename to jobs-service/jobs-service-messaging-http/src/test/resources/org/kie/kogito/jobs/service/messaging/http/StructuredCloudEvent.json
diff --git a/jobs-service/jobs-service-messaging-kafka/pom.xml b/jobs-service/jobs-service-messaging-kafka/pom.xml
new file mode 100644
index 0000000000..382464f528
--- /dev/null
+++ b/jobs-service/jobs-service-messaging-kafka/pom.xml
@@ -0,0 +1,83 @@
+
+
+
+ jobs-service
+ org.kie.kogito
+ 2.0.0-SNAPSHOT
+
+ 4.0.0
+
+ jobs-service-messaging-kafka
+ Kogito Apps :: Jobs Service :: Messaging Kafka
+
+
+
+ org.kie.kogito
+ jobs-service-common
+
+
+
+ io.cloudevents
+ cloudevents-json-jackson
+
+
+
+
+ io.quarkus
+ quarkus-smallrye-fault-tolerance
+
+
+
+ io.smallrye.reactive
+ smallrye-mutiny-vertx-kafka-client
+
+
+ io.quarkus
+ quarkus-smallrye-reactive-messaging
+
+
+ io.quarkus
+ quarkus-smallrye-reactive-messaging-kafka
+
+
+
+
+ org.kie.kogito
+ jobs-service-common
+ test-jar
+ test
+
+
+ org.kie.kogito
+ kogito-quarkus-test-utils
+ test
+
+
+ io.quarkus
+ quarkus-junit5
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+ org.mockito
+ mockito-junit-jupiter
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+ org.awaitility
+ awaitility
+ test
+
+
+
\ No newline at end of file
diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/messaging/CloudEventDeserializer.java b/jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/CloudEventDeserializer.java
similarity index 94%
rename from jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/messaging/CloudEventDeserializer.java
rename to jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/CloudEventDeserializer.java
index 4b8b6c4669..cfcef36fc0 100644
--- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/messaging/CloudEventDeserializer.java
+++ b/jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/CloudEventDeserializer.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package org.kie.kogito.jobs.service.messaging;
+package org.kie.kogito.jobs.service.messaging.kafka;
import io.cloudevents.CloudEvent;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
diff --git a/jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/KafkaMessagingHandler.java b/jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/KafkaMessagingHandler.java
new file mode 100644
index 0000000000..c36e80a1b0
--- /dev/null
+++ b/jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/KafkaMessagingHandler.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2023 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kie.kogito.jobs.service.messaging.kafka;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+
+import org.eclipse.microprofile.reactive.messaging.spi.Connector;
+import org.kie.kogito.jobs.service.messaging.MessagingHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.smallrye.reactive.messaging.kafka.KafkaConnector;
+
+@ApplicationScoped
+public class KafkaMessagingHandler implements MessagingHandler {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessagingHandler.class);
+
+ @Inject
+ @Connector(value = "smallrye-kafka")
+ KafkaConnector kafkaConnector;
+
+ @Override
+ public void pause() {
+ kafkaConnector.getConsumerChannels().forEach(c -> {
+ LOGGER.debug("pausing kafka channel: {}", c);
+ kafkaConnector.getConsumer(c).pause();
+ });
+ }
+
+ @Override
+ public void resume() {
+ kafkaConnector.getConsumerChannels().forEach(c -> {
+ LOGGER.debug("resuming kafka channel: {}", c);
+ kafkaConnector.getConsumer(c).resume();
+ });
+ }
+}
diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/messaging/KafkaReactiveMessagingEventConsumer.java b/jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/KafkaReactiveMessagingEventConsumer.java
similarity index 94%
rename from jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/messaging/KafkaReactiveMessagingEventConsumer.java
rename to jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/KafkaReactiveMessagingEventConsumer.java
index 47e8c8c85b..24e8e42db7 100644
--- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/messaging/KafkaReactiveMessagingEventConsumer.java
+++ b/jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/KafkaReactiveMessagingEventConsumer.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package org.kie.kogito.jobs.service.messaging;
+package org.kie.kogito.jobs.service.messaging.kafka;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
@@ -23,6 +23,7 @@
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
+import org.kie.kogito.jobs.service.messaging.MessagingConsumer;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.scheduler.impl.TimerDelegateJobScheduler;
diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/KafkaConfiguration.java b/jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaConfiguration.java
similarity index 90%
rename from jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/KafkaConfiguration.java
rename to jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaConfiguration.java
index acaffc9e98..3eed9b1921 100644
--- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/KafkaConfiguration.java
+++ b/jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaConfiguration.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Red Hat, Inc. and/or its affiliates.
+ * Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.kie.kogito.jobs.service.stream;
+package org.kie.kogito.jobs.service.messaging.kafka.stream;
import java.util.Arrays;
import java.util.Map;
@@ -35,7 +35,7 @@
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.kafka.admin.KafkaAdminClient;
-import static org.kie.kogito.jobs.service.stream.KafkaJobStreams.PUBLISH_EVENTS_CONFIG_KEY;
+import static org.kie.kogito.jobs.service.messaging.kafka.stream.KafkaJobStreams.PUBLISH_EVENTS_CONFIG_KEY;
@Startup
@ApplicationScoped
@@ -57,7 +57,7 @@ public class KafkaConfiguration {
public KafkaConfiguration(@Identifier("default-kafka-broker") Instance
+
+ org.kie.kogito
+ jobs-service-messaging-kafka
+
+
+ org.kie.kogito
+ jobs-service-messaging-http
+
io.quarkus
quarkus-mongodb-client
+
org.kie.kogito
jobs-service-common
diff --git a/jobs-service/jobs-service-mongodb/src/test/resources/application.properties b/jobs-service/jobs-service-mongodb/src/test/resources/application.properties
index 35a14fb119..3ac740567b 100644
--- a/jobs-service/jobs-service-mongodb/src/test/resources/application.properties
+++ b/jobs-service/jobs-service-mongodb/src/test/resources/application.properties
@@ -1,6 +1,3 @@
-#enabling in-memory connector in case kafka is not enabled, to avoid issues with DI
-mp.messaging.outgoing.kogito-job-service-job-status-events.connector=smallrye-in-memory
-
kogito.jobs-service.maxIntervalLimitToRetryMillis=60000
kogito.jobs-service.backoffRetryMillis=1000
kogito.service.url=http://localhost:8080
@@ -8,7 +5,6 @@ kogito.jobs-service.schedulerChunkInMinutes=10
kogito.jobs-service.loadJobIntervalInMinutes=10
kogito.jobs-service.loadJobFromCurrentTimeIntervalInMinutes=0
kogito.jobs-service.forceExecuteExpiredJobs=false
-kogito.jobs-events-topic=kogito-jobs-events
# Keycloak oidc
quarkus.oidc.enabled=true
@@ -22,6 +18,6 @@ quarkus.mongodb.connection-string=mongodb://localhost:27017
quarkus.mongodb.database=kogito
# Outgoing events for the event based API integration tests
-%events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.connector=smallrye-kafka
-%events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.topic=kogito-job-service-job-request-events
-%events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.value.serializer=org.apache.kafka.common.serialization.StringSerializer
+%kafka-events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.connector=smallrye-kafka
+%kafka-events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.topic=kogito-job-service-job-request-events
+%kafka-events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.value.serializer=org.apache.kafka.common.serialization.StringSerializer
diff --git a/jobs-service/jobs-service-postgresql-common/src/test/resources/application.properties b/jobs-service/jobs-service-postgresql-common/src/test/resources/application.properties
index cc0fa898d2..0ebda5a108 100644
--- a/jobs-service/jobs-service-postgresql-common/src/test/resources/application.properties
+++ b/jobs-service/jobs-service-postgresql-common/src/test/resources/application.properties
@@ -1,6 +1,3 @@
-#enabling in-memory connector in case kafka is not enabled, to avoid issues with DI
-mp.messaging.outgoing.kogito-job-service-job-status-events.connector=smallrye-in-memory
-
kogito.jobs-service.maxIntervalLimitToRetryMillis=60000
kogito.jobs-service.backoffRetryMillis=1000
kogito.service.url=http://localhost:8080
@@ -8,7 +5,6 @@ kogito.jobs-service.schedulerChunkInMinutes=10
kogito.jobs-service.loadJobIntervalInMinutes=10
kogito.jobs-service.loadJobFromCurrentTimeIntervalInMinutes=0
kogito.jobs-service.forceExecuteExpiredJobs=false
-kogito.jobs-events-topic=kogito-jobs-events
# Keycloak oidc
quarkus.oidc.enabled=true
@@ -22,8 +18,3 @@ quarkus.datasource.db-kind=postgresql
quarkus.flyway.migrate-at-start=true
quarkus.flyway.clean-at-start=true
quarkus.flyway.locations=db/jobs-service
-
-# Outgoing events for the event based API integration tests
-%events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.connector=smallrye-kafka
-%events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.topic=kogito-job-service-job-request-events
-%events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.value.serializer=org.apache.kafka.common.serialization.StringSerializer
diff --git a/jobs-service/jobs-service-postgresql/pom.xml b/jobs-service/jobs-service-postgresql/pom.xml
index 2336e3fc66..67f17e3dee 100644
--- a/jobs-service/jobs-service-postgresql/pom.xml
+++ b/jobs-service/jobs-service-postgresql/pom.xml
@@ -18,6 +18,37 @@
org.kie.kogito
jobs-service-postgresql-common
+
+ org.kie.kogito
+ jobs-service-messaging-kafka
+
+
+ org.kie.kogito
+ jobs-service-messaging-http
+
+
+
+
+ org.kie.kogito
+ kogito-quarkus-test-utils
+ test
+
+
+ org.kie.kogito
+ jobs-service-common
+ test-jar
+ test
+
+
+ io.quarkus
+ quarkus-junit5
+ test
+
+
+ org.awaitility
+ awaitility
+ test
+
diff --git a/jobs-service/jobs-service-postgresql-common/src/test/java/org/kie/kogito/jobs/service/messaging/PostgreSqlCallbackResource.java b/jobs-service/jobs-service-postgresql/src/test/java/org/kie/kogito/jobs/service/messaging/PostgreSqlCallbackResource.java
similarity index 100%
rename from jobs-service/jobs-service-postgresql-common/src/test/java/org/kie/kogito/jobs/service/messaging/PostgreSqlCallbackResource.java
rename to jobs-service/jobs-service-postgresql/src/test/java/org/kie/kogito/jobs/service/messaging/PostgreSqlCallbackResource.java
diff --git a/jobs-service/jobs-service-postgresql-common/src/test/java/org/kie/kogito/jobs/service/messaging/PostgreSqlEventSupportTestProfile.java b/jobs-service/jobs-service-postgresql/src/test/java/org/kie/kogito/jobs/service/messaging/PostgreSqlEventSupportTestProfile.java
similarity index 100%
rename from jobs-service/jobs-service-postgresql-common/src/test/java/org/kie/kogito/jobs/service/messaging/PostgreSqlEventSupportTestProfile.java
rename to jobs-service/jobs-service-postgresql/src/test/java/org/kie/kogito/jobs/service/messaging/PostgreSqlEventSupportTestProfile.java
diff --git a/jobs-service/jobs-service-postgresql-common/src/test/java/org/kie/kogito/jobs/service/messaging/PostgreSqlMessagingApiTest.java b/jobs-service/jobs-service-postgresql/src/test/java/org/kie/kogito/jobs/service/messaging/PostgreSqlMessagingApiTest.java
similarity index 100%
rename from jobs-service/jobs-service-postgresql-common/src/test/java/org/kie/kogito/jobs/service/messaging/PostgreSqlMessagingApiTest.java
rename to jobs-service/jobs-service-postgresql/src/test/java/org/kie/kogito/jobs/service/messaging/PostgreSqlMessagingApiTest.java
diff --git a/jobs-service/jobs-service-postgresql/src/test/resources/application.properties b/jobs-service/jobs-service-postgresql/src/test/resources/application.properties
new file mode 100644
index 0000000000..efc9215c21
--- /dev/null
+++ b/jobs-service/jobs-service-postgresql/src/test/resources/application.properties
@@ -0,0 +1,12 @@
+kogito.jobs-service.maxIntervalLimitToRetryMillis=60000
+kogito.jobs-service.backoffRetryMillis=1000
+kogito.service.url=http://localhost:8080
+kogito.jobs-service.schedulerChunkInMinutes=10
+kogito.jobs-service.loadJobIntervalInMinutes=10
+kogito.jobs-service.loadJobFromCurrentTimeIntervalInMinutes=0
+kogito.jobs-service.forceExecuteExpiredJobs=false
+
+# Outgoing events for the event based API integration tests
+%kafka-events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.connector=smallrye-kafka
+%kafka-events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.topic=kogito-job-service-job-request-events
+%kafka-events-support.mp.messaging.outgoing.kogito-job-service-job-request-events-emitter.value.serializer=org.apache.kafka.common.serialization.StringSerializer
diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/deployment/pom.xml b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/deployment/pom.xml
index bcdf5e05f1..8d8e911c27 100644
--- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/deployment/pom.xml
+++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/deployment/pom.xml
@@ -54,10 +54,6 @@
io.quarkus
quarkus-swagger-ui-deployment
-
- io.quarkiverse.reactivemessaging.http
- quarkus-reactive-messaging-http-deployment
-
io.quarkus
quarkus-smallrye-reactive-messaging-deployment
@@ -70,14 +66,6 @@
io.quarkus
quarkus-smallrye-health-deployment
-
- io.quarkus
- quarkus-smallrye-reactive-messaging-kafka-deployment
-
-
- io.quarkus
- quarkus-smallrye-fault-tolerance-deployment
-
io.quarkiverse.embedded.postgresql
quarkus-embedded-postgresql-deployment
diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/KogitoAddonsQuarkusJobsServiceEmbeddedRuntimeConfig.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/KogitoAddonsQuarkusJobsServiceEmbeddedRuntimeConfig.java
index 12072aef4c..1d746cd5c2 100644
--- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/KogitoAddonsQuarkusJobsServiceEmbeddedRuntimeConfig.java
+++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/KogitoAddonsQuarkusJobsServiceEmbeddedRuntimeConfig.java
@@ -62,16 +62,4 @@ public class KogitoAddonsQuarkusJobsServiceEmbeddedRuntimeConfig {
@ConfigItem(name = "forceExecuteExpiredJobs", defaultValue = "true")
public boolean forceExecuteExpiredJobs;
- /**
- * Enables/Disables the kafka messaging support.
- */
- @ConfigItem(name = "events-support", defaultValue = "false")
- public boolean eventsSupport;
-
- /**
- * Enables/Disables the knative eventing support.
- */
- @ConfigItem(name = "knative-events", defaultValue = "false")
- public boolean knativeEvents;
-
}
diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/resources/application.properties b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/resources/application.properties
index 430bc330a8..4cb7a5ef22 100644
--- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/resources/application.properties
+++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/resources/application.properties
@@ -5,9 +5,3 @@ quarkus.datasource.jobs_service.devservices.enabled=false
quarkus.datasource.devservices.enabled=false
quarkus.flyway.jobs_service.locations=db/jobs-service
quarkus.flyway.jobs_service.migrate-at-start=true
-
-# https://issues.redhat.com/browse/KOGITO-9382, Exclude kafka connectors from the kogito-addons-quarkus-jobs-service-embedded
-# since we don't expect those scenarios by now.
-mp.messaging.outgoing.kogito-job-service-job-status-events.enabled=false
-quarkus.arc.exclude-types=org.kie.kogito.jobs.service.messaging.v2.KafkaReactiveMessagingEventConsumer,org.kie.kogito.jobs.service.messaging.KafkaReactiveMessagingEventConsumer,org.kie.kogito.jobs.service.stream.KafkaJobStreams
-
diff --git a/jobs-service/pom.xml b/jobs-service/pom.xml
index 16e5ff72e9..1f8f4dab3a 100644
--- a/jobs-service/pom.xml
+++ b/jobs-service/pom.xml
@@ -22,6 +22,8 @@
jobs-service-postgresql
jobs-service-inmemory
kogito-addons-jobs-service
+ jobs-service-messaging-kafka
+ jobs-service-messaging-http
@@ -93,6 +95,16 @@
kogito-addons-quarkus-jobs-service-embedded-deployment
${project.version}
+
+ org.kie.kogito
+ jobs-service-messaging-kafka
+ ${project.version}
+
+
+ org.kie.kogito
+ jobs-service-messaging-http
+ ${project.version}
+
\ No newline at end of file