Skip to content

Commit

Permalink
[incubator-kie-issues-711] Refactroing job service to allow collocate…
Browse files Browse the repository at this point in the history
…d service for Quarkus
  • Loading branch information
elguardian committed Nov 28, 2023
1 parent d17609f commit b0fcdbc
Show file tree
Hide file tree
Showing 12 changed files with 489 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,12 @@ void startup(@Observes StartupEvent startupEvent) {
ex -> LOGGER.error("Error on heartbeat {}", currentInfo.get(), ex)))
.pause();

leader.set(true);
enableCommunication();
//initial leader check
tryBecomeLeader(currentInfo.get(), checkLeader, heartbeat)
.subscribe().with(i -> LOGGER.info("Initial leader check completed"),
ex -> LOGGER.error("Error on initial check leader", ex));
// tryBecomeLeader(currentInfo.get(), checkLeader, heartbeat)
// .subscribe().with(i -> LOGGER.info("Initial leader check completed"),
// ex -> LOGGER.error("Error on initial check leader", ex));
}

private void disableCommunication() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public static <T> T payload(Recipient<?> recipient) {
}

public static org.kie.kogito.jobs.service.model.Recipient from(Recipient<?> recipient) {
checkIsSupported(recipient);
// checkIsSupported(recipient);
return new RecipientInstance(recipient);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=
quarkus.datasource.password=
quarkus.datasource.reactive.url=
quarkus.datasource.jdbc.url=
quarkus.flyway.migrate-at-start=true
quarkus.datasource.health.enabled=true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>kogito-addons-jobs-service</artifactId>
<groupId>org.kie.kogito</groupId>
<version>2.0.0-SNAPSHOT</version>
</parent>


<artifactId>kogito-addons-quarkus-jobs</artifactId>

<name>Jobs Collocated Quarkus Addon - Runtime</name>
<description>Run Jobs Service embedded with the application.</description>


<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>jobs-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>jobs-service-internal-api</artifactId>
</dependency>

<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-services</artifactId>
</dependency>

<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-timer</artifactId>
</dependency>

<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>jobs-service-common</artifactId>

<exclusions>
<exclusion>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive</artifactId>
</exclusion>
<exclusion>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson</artifactId>
</exclusion>
</exclusions>
</dependency>

</dependencies>


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.jobs.embedded;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.kie.kogito.Application;
import org.kie.kogito.Model;
import org.kie.kogito.jobs.service.api.Recipient;
import org.kie.kogito.jobs.service.exception.JobExecutionException;
import org.kie.kogito.jobs.service.executor.JobExecutor;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.kie.kogito.jobs.service.model.RecipientInstance;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.Processes;
import org.kie.kogito.services.jobs.impl.TriggerJobCommand;

import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class EmbeddedJobExecutor implements JobExecutor {

@Inject
Processes processes;

@Inject
Application application;

@Override
public Uni<JobExecutionResponse> execute(JobDetails jobDetails) {

String correlationId = jobDetails.getCorrelationId();
RecipientInstance recipientModel = (RecipientInstance) jobDetails.getRecipient();
InVMRecipient recipient = (InVMRecipient) recipientModel.getRecipient();
String timerId = recipient.getPayload().getData().timerId();
String processId = recipient.getPayload().getData().processId();
Process<? extends Model> process = processes.processById(processId);
String processInstanceId = recipient.getPayload().getData().processInstanceId();
Integer limit = jobDetails.getRetries();

TriggerJobCommand command = new TriggerJobCommand(processInstanceId, correlationId, timerId, limit, process, application.unitOfWorkManager());

return Uni.createFrom().item(command::execute)
.onFailure()
.transform(
unexpected -> new JobExecutionException(jobDetails.getId(), "Unexpected error when executing Embedded request for job: " + jobDetails.getId() + ". " + unexpected.getMessage()))
.onItem()
.transform(res -> JobExecutionResponse.builder()
.message("Embedded job executed")
.code(String.valueOf(200))
.now()
.jobId(jobDetails.getId())
.build());
}

@Override
public Class<? extends Recipient> type() {
return InVMRecipient.class;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.jobs.embedded;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.kie.kogito.jobs.JobsService;
import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.ProcessJobDescription;
import org.kie.kogito.jobs.api.JobCallbackResourceDef;
import org.kie.kogito.jobs.service.adapter.JobDetailsAdapter;
import org.kie.kogito.jobs.service.api.Job;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class EmbeddedJobsService implements JobsService {
private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedJobsService.class);

@Inject
ReactiveJobScheduler scheduler;

public EmbeddedJobsService() {
LOGGER.info("Starting Embedded Job Service");
}

@Override
public String scheduleProcessJob(ProcessJobDescription description) {
LOGGER.debug("ScheduleProcessJob: {} not supported", description);
return null;
}

private class BlockingJobSubscriber implements Subscriber<JobDetails> {

private CountDownLatch latch;
private String outcome;
private Throwable exception;

public BlockingJobSubscriber() {
latch = new CountDownLatch(1);
}

@Override
public void onSubscribe(Subscription s) {
s.request(1L);
}

@Override
public void onNext(JobDetails t) {
LOGGER.info("BlockingJobSubscriber::onNext {}", t);
outcome = t.getId();
}

@Override
public void onError(Throwable t) {
exception = t;
LOGGER.info("BlockingJobSubscriber::onError {}", t);
latch.countDown();
}

@Override
public void onComplete() {
LOGGER.info("BlockingJobSubscriber::onComplete");
latch.countDown();
}

public String get() {
try {
latch.await();
if (exception != null) {
throw new RuntimeException(exception);
}
LOGGER.info("BlockingJobSubscriber::get {}", outcome);
return outcome;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

}

@Override
public String scheduleProcessInstanceJob(ProcessInstanceJobDescription description) {
LOGGER.info("Embedded ScheduleProcessJob: {}", description);
Job job = Job.builder()
.id(description.id())
.correlationId(description.id())
.recipient(new InVMRecipient(new InVMPayloadData(description)))
.schedule(JobCallbackResourceDef.buildSchedule(description))
.build();

JobDetails jobDetails = JobDetailsAdapter.from(job);
BlockingJobSubscriber subscriber = new BlockingJobSubscriber();
scheduler.schedule(jobDetails).subscribe(subscriber);
String outcome = subscriber.get();
LOGGER.info("Embedded ScheduleProcessJob: {} scheduled", outcome);
return outcome;
}

@Override
public boolean cancelJob(String jobId) {
try {
LOGGER.info("Embedded cancelJob: {}", jobId);
return JobStatus.CANCELED.equals(scheduler.cancel(jobId).toCompletableFuture().get().getStatus());
} catch (InterruptedException | ExecutionException e) {
return false;
}
}

}
Loading

0 comments on commit b0fcdbc

Please sign in to comment.