Skip to content

Commit

Permalink
KOGITO-9561: Jobs service kafka and http messaging modularization
Browse files Browse the repository at this point in the history
  • Loading branch information
wmedvede committed Jul 17, 2023
1 parent 3908d94 commit 0d6fe8e
Show file tree
Hide file tree
Showing 59 changed files with 564 additions and 282 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ public void init(JobServiceTestResource annotation) {
}
if (annotation.kafkaEnabled()) {
resource.withSharedDependencyContainer("kafka", new KogitoKafkaContainer());
resource.getServiceContainers(JobServiceContainer.class).forEach(c -> c.addEnv("QUARKUS_PROFILE", "events-support"));
resource.getServiceContainers(JobServiceContainer.class).forEach(c -> c.addEnv("QUARKUS_PROFILE", "kafka-events-support"));
}
if (annotation.knativeEventingEnabled()) {
resource.getServiceContainers(JobServiceContainer.class).forEach(c -> {
c.addEnv("QUARKUS_PROFILE", "http-events-support");
c.addEnv("KOGITO_JOBS_SERVICE_HTTP_JOB_STATUS_CHANGE_EVENTS", "false");
});
}
if (annotation.dataIndexEnabled()) {
DataIndexPostgreSqlContainer container = new DataIndexPostgreSqlContainer();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
kogito.service.url=http://localhost:8080

# Disable the KSinkInjectionHealthCheck since the K_SINK env variable is no passed in this context.
# Disable the KSinkInjectionHealthCheck since the K_SINK env variable is not passed in this context.
quarkus.smallrye-health.check."org.kie.kogito.addons.quarkus.knative.eventing.KSinkInjectionHealthCheck".enabled=false

quarkus.kogito.devservices.enabled=false
Expand Down
26 changes: 0 additions & 26 deletions jobs-service/jobs-service-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,6 @@
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-json-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-http-vertx</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-openapi</artifactId>
Expand All @@ -96,22 +92,6 @@
<artifactId>quarkus-smallrye-reactive-messaging</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

<dependency>
<groupId>io.quarkiverse.reactivemessaging.http</groupId>
<artifactId>quarkus-reactive-messaging-http</artifactId>
<version>${version.io.quarkiverse.reactivemessaging.http}</version>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-kafka-client</artifactId>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-in-memory</artifactId>
Expand All @@ -133,12 +113,6 @@
<artifactId>quarkus-smallrye-health</artifactId>
</dependency>

<!-- Fault tolerance -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-fault-tolerance</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Event;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.kie.kogito.jobs.service.messaging.MessagingHandler;
import org.kie.kogito.jobs.service.model.JobServiceManagementInfo;
import org.kie.kogito.jobs.service.repository.JobServiceManagementRepository;
import org.kie.kogito.jobs.service.utils.DateUtil;
Expand All @@ -38,7 +39,6 @@
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.vertx.mutiny.core.TimeoutStream;
import io.vertx.mutiny.core.Vertx;

Expand All @@ -60,8 +60,7 @@ public class JobServiceInstanceManager {
String leaderManagementId;

@Inject
@Connector(value = "smallrye-kafka")
KafkaConnector kafkaConnector;
Instance<MessagingHandler> messagingHandlerInstance;

@Inject
Event<MessagingChangeEvent> messagingChangeEventEvent;
Expand Down Expand Up @@ -105,8 +104,7 @@ void startup(@Observes StartupEvent startupEvent) {

private void disableCommunication() {
//disable consuming events
kafkaConnector.getConsumerChannels().stream().forEach(c -> kafkaConnector.getConsumer(c).pause());

messagingHandlerInstance.stream().forEach(MessagingHandler::pause);
//disable producing events
messagingChangeEventEvent.fire(new MessagingChangeEvent(false));

Expand All @@ -115,8 +113,7 @@ private void disableCommunication() {

private void enableCommunication() {
//enable consuming events
kafkaConnector.getConsumerChannels().stream().forEach(c -> kafkaConnector.getConsumer(c).resume());

messagingHandlerInstance.stream().forEach(MessagingHandler::resume);
//enable producing events
messagingChangeEventEvent.fire(new MessagingChangeEvent(true));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.kie.kogito.jobs.service.messaging;

public interface MessagingHandler {

void pause();

void resume();
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
"org.kie.kogito.jobs.service.api.schedule.cron.CronSchedule",
"org.kie.kogito.jobs.service.api.event.JobCloudEvent",
"org.kie.kogito.jobs.service.api.event.CreateJobEvent",
"org.kie.kogito.jobs.service.api.event.DeleteJobEvent"
"org.kie.kogito.jobs.service.api.event.DeleteJobEvent",
"org.kie.kogito.jobs.service.resource.error.ErrorResponse"
})
public class ReflectionConfiguration {
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ protected void jobStatusChange(JobDetails job) {
}
}

CompletionStage<Void> onAck(JobDetails job) {
protected CompletionStage<Void> onAck(JobDetails job) {
LOGGER.debug("Job Status change published: {}", job);
return CompletableFuture.completedFuture(null);
}

CompletionStage<Void> onNack(Throwable reason, JobDetails job) {
protected CompletionStage<Void> onNack(Throwable reason, JobDetails job) {
String msg = String.format("An error was produced while publishing a Job status change for the job: %s", job);
LOGGER.error(msg, reason);
return CompletableFuture.completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,32 @@
# limitations under the License.
#

#Log Config
# Log Config
quarkus.log.level=INFO
%dev.quarkus.log.category."org.kie.kogito.jobs".level=DEBUG

##Console
# Console
quarkus.log.console.format=%d{yyyy-MM-dd HH:mm:ss,SSS} %h %-5p [%c:%L] (%t) %s%e%n
quarkus.log.console.async=true

#Container image
# Container image
quarkus.container-image.build=${quarkus.build.image:true}
quarkus.container-image.group=org.kie.kogito

#Web Config
# Web Config
quarkus.ssl.native=true
quarkus.http.cors=true
quarkus.http.cors.origins=/.*/
quarkus.http.limits.max-body-size=10M
quarkus.http.port=8080

#Swagger
# Swagger
quarkus.swagger-ui.always-include=true

#OpenAPI document extensions
# OpenAPI document extensions
mp.openapi.filter=org.kie.kogito.jobs.service.openapi.JobServiceModelFilter

#Job Service
# Job Service
kogito.jobs-service.maxIntervalLimitToRetryMillis=60000
kogito.jobs-service.backoffRetryMillis=1000
kogito.service.url=http://localhost:8080
Expand All @@ -48,48 +48,14 @@ kogito.jobs-service.loadJobIntervalInMinutes=10
kogito.jobs-service.loadJobFromCurrentTimeIntervalInMinutes=60
kogito.jobs-service.forceExecuteExpiredJobs=true

#enabling in-memory connector in case kafka is not enabled, to avoid issues with DI
mp.messaging.outgoing.kogito-job-service-job-status-events.connector=smallrye-in-memory

#Removing beans related to Kafka from CDI when eventing is not enabled
quarkus.arc.exclude-types=io.smallrye.reactive.messaging.health.*,org.kie.kogito.jobs.service.stream.KafkaConfiguration

#Configure Events Publishing on Job Service using profile
#disabled by default
kogito.jobs-service.events-support=false
kogito.jobs-events-topic=kogito-jobs-events

#enabled with the profile: 'events-support' (-Dquarkus.profile=events-support)
%events-support.quarkus.kafka.health.enabled=true
%events-support.kafka.bootstrap.servers=localhost:9092
%events-support.kogito.jobs-service.events-support=true
%events-support.mp.messaging.outgoing.kogito-job-service-job-status-events.connector=smallrye-kafka
#disabled to allow property `kafka.bootstrap.servers` to override
#%events-support.mp.messaging.outgoing.kogito-job-service-job-status-events.bootstrap.servers=localhost:9092
%events-support.mp.messaging.outgoing.kogito-job-service-job-status-events.topic=${kogito.jobs-events-topic}
%events-support.mp.messaging.outgoing.kogito-job-service-job-status-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer
%events-support.quarkus.arc.exclude-types=
# Job service event based API configuration for the events-support profile
# enable/disable the event based API for creating/updating jobs. Please do not disable unless you are completely sure.
%events-support.kogito.jobs-service.events-api=true
%events-support.mp.messaging.incoming.kogito-job-service-job-request-events.connector=smallrye-kafka
%events-support.mp.messaging.incoming.kogito-job-service-job-request-events.enabled=${kogito.jobs-service.events-api}
%events-support.mp.messaging.incoming.kogito-job-service-job-request-events.topic=kogito-job-service-job-request-events
%events-support.mp.messaging.incoming.kogito-job-service-job-request-events.value.deserializer=org.kie.kogito.jobs.service.messaging.CloudEventDeserializer
%events-support.mp.messaging.incoming.kogito-job-service-job-request-events.client.id=kogito-jobs-service
%events-support.mp.messaging.incoming.kogito-job-service-job-request-events.group.id=kogito-jobs-service
%events-support.mp.messaging.incoming.kogito-job-service-job-request-events.enable.auto.commit=false
%events-support.mp.messaging.incoming.kogito-job-service-job-request-events.auto.offset.reset=earliest
%events-support.mp.messaging.incoming.kogito-job-service-job-request-events.isolation.level=read_committed

quarkus.oidc.enabled=true
quarkus.oidc.tenant-enabled=false

# HTTP Security Configuration
quarkus.http.auth.permission.authenticated.paths=/*
quarkus.http.auth.permission.authenticated.policy=permit

#enabled with the profile: 'keycloak' (-Dquarkus.profile=keycloak)
# enabled with the profile: 'keycloak' (-Dquarkus.profile=keycloak)
%keycloak.quarkus.oidc.enabled=true
%keycloak.quarkus.oidc.tenant-enabled=true
%keycloak.quarkus.oidc.auth-server-url=http://localhost:8280/auth/realms/kogito
Expand All @@ -98,70 +64,3 @@ quarkus.http.auth.permission.authenticated.policy=permit
%keycloak.quarkus.http.auth.policy.role-policy1.roles-allowed=confidential
%keycloak.quarkus.http.auth.permission.roles1.paths=/*
%keycloak.quarkus.http.auth.permission.roles1.policy=role-policy1

#Configure Events Publishing and security on Job Service using profile
#enabled with the profile: 'events-support-auth' (-Dquarkus.profile=events-support-auth)
%events-support-auth.kogito.jobs-service.events-support=true
%events-support-auth.quarkus.kafka.health.enabled=true
%events-support-auth.kafka.bootstrap.servers=localhost:9092
%events-support-auth.mp.messaging.outgoing.kogito-job-service-job-status-events.connector=smallrye-kafka
#disabled to allow property `kafka.bootstrap.servers` to override
#%events-support-auth.mp.messaging.outgoing.kogito-job-service-job-status-events.bootstrap.servers=localhost:9092
%events-support-auth.mp.messaging.outgoing.kogito-job-service-job-status-events.topic=${kogito.jobs-events-topic}
%events-support-auth.mp.messaging.outgoing.kogito-job-service-job-status-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer
%events-support-auth.quarkus.arc.exclude-types=
# Job service event based API configuration for the events-support-auth profile
# enable/disable the event based API for creating/updating jobs. Please do not disable unless you are completely sure.
%events-support-auth.kogito.jobs-service.events-api=true
%events-support-auth.mp.messaging.incoming.kogito-job-service-job-request-events.connector=smallrye-kafka
%events-support-auth.mp.messaging.incoming.kogito-job-service-job-request-events.enabled=${kogito.jobs-service.events-api}
%events-support-auth.mp.messaging.incoming.kogito-job-service-job-request-events.topic=kogito-job-service-job-request-events
%events-support-auth.mp.messaging.incoming.kogito-job-service-job-request-events.value.deserializer=org.kie.kogito.jobs.service.messaging.CloudEventDeserializer
%events-support-auth.mp.messaging.incoming.kogito-job-service-job-request-events.client.id=kogito-jobs-service
%events-support-auth.mp.messaging.incoming.kogito-job-service-job-request-events.group.id=kogito-jobs-service
%events-support-auth.mp.messaging.incoming.kogito-job-service-job-request-events.enable.auto.commit=false
%events-support-auth.mp.messaging.incoming.kogito-job-service-job-request-events.auto.offset.reset=earliest
%events-support-auth.mp.messaging.incoming.kogito-job-service-job-request-events.isolation.level=read_committed

%events-support-auth.quarkus.oidc.enabled=true
%events-support-auth.quarkus.oidc.tenant-enabled=false
%events-support-auth.quarkus.oidc.auth-server-url=http://localhost:8280/auth/realms/kogito
%events-support-auth.quarkus.oidc.client-id=kogito-job-service
%events-support-auth.quarkus.oidc.credentials.secret=secret
%events-support-auth.quarkus.http.auth.policy.role-policy1.roles-allowed=confidential
%events-support-auth.quarkus.http.auth.permission.roles1.paths=/*
%events-support-auth.quarkus.http.auth.permission.roles1.policy=role-policy1


# Incoming connector to receive knative events to create/update/cancel a Job
mp.messaging.incoming.kogito-job-service-job-request-events-knative.connector=quarkus-http
mp.messaging.incoming.kogito-job-service-job-request-events-knative.path=/jobs/events
mp.messaging.incoming.kogito-job-service-job-request-events-knative.method=POST

#Knative V2
mp.messaging.incoming.kogito-job-service-job-request-events-knative-v2.connector=quarkus-http
mp.messaging.incoming.kogito-job-service-job-request-events-knative-v2.path=/v2/jobs/events
mp.messaging.incoming.kogito-job-service-job-request-events-knative-v2.method=POST

#Kafka V2
%events-support.mp.messaging.incoming.kogito-job-service-job-request-events-v2.connector=smallrye-kafka
%events-support.mp.messaging.incoming.kogito-job-service-job-request-events-v2.enabled=${kogito.jobs-service.events-api}
%events-support.mp.messaging.incoming.kogito-job-service-job-request-events-v2.topic=kogito-job-service-job-request-events-v2
%events-support.mp.messaging.incoming.kogito-job-service-job-request-events-v2.value.deserializer=org.kie.kogito.jobs.service.messaging.CloudEventDeserializer
%events-support.mp.messaging.incoming.kogito-job-service-job-request-events-v2.client.id=kogito-jobs-service-v2
%events-support.mp.messaging.incoming.kogito-job-service-job-request-events-v2.group.id=kogito-jobs-service
%events-support.mp.messaging.incoming.kogito-job-service-job-request-events-v2.enable.auto.commit=false
%events-support.mp.messaging.incoming.kogito-job-service-job-request-events-v2.auto.offset.reset=earliest
%events-support.mp.messaging.incoming.kogito-job-service-job-request-events-v2.isolation.level=read_committed

# Set to true to enable the sending of knative events to notify the Job Status changes
kogito.jobs-service.knative-events=false

# Enable the K_SINK environment variable check
quarkus.smallrye-health.check."org.kie.kogito.jobs.service.health.knative.KSinkInjectionHealthCheck".enabled=${kogito.jobs-service.knative-events:false}

# Outgoing connector to send Job status change knative events
mp.messaging.outgoing.kogito-job-service-job-status-events-knative.connector=quarkus-http
mp.messaging.outgoing.kogito-job-service-job-status-events-knative.url=${K_SINK:http://localhost:8180/jobs}
mp.messaging.outgoing.kogito-job-service-job-status-events-knative.method=POST

Loading

0 comments on commit 0d6fe8e

Please sign in to comment.