Skip to content

Commit

Permalink
fix in-vm events
Browse files Browse the repository at this point in the history
  • Loading branch information
elguardian committed Nov 29, 2023
1 parent 631b203 commit 22c6248
Show file tree
Hide file tree
Showing 19 changed files with 323 additions and 277 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/target/
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/target/
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.kie.kogito.jobs.service.executor.JobExecutorResolver;
import org.kie.kogito.jobs.service.model.JobDetailsContext;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.kie.kogito.jobs.service.stream.JobStreams;
import org.kie.kogito.jobs.service.stream.JobEventPublisher;
import org.kie.kogito.timer.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,9 +40,9 @@ public class DelegateJob implements Job<JobDetailsContext> {

private final JobExecutorResolver jobExecutorResolver;

private final JobStreams jobStreams;
private final JobEventPublisher jobStreams;

public DelegateJob(JobExecutorResolver executorResolver, JobStreams jobStreams) {
public DelegateJob(JobExecutorResolver executorResolver, JobEventPublisher jobStreams) {
this.jobExecutorResolver = executorResolver;
this.jobStreams = jobStreams;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.stream.JobStreams;
import org.kie.kogito.jobs.service.stream.JobEventPublisher;

import io.vertx.core.Vertx;

public abstract class BaseReactiveJobRepository implements ReactiveJobRepository {

private Vertx vertx;

private JobStreams jobStreams;
private JobEventPublisher jobStreams;

protected BaseReactiveJobRepository(Vertx vertx, JobStreams jobStreams) {
protected BaseReactiveJobRepository(Vertx vertx, JobEventPublisher jobStreams) {
this.vertx = vertx;
this.jobStreams = jobStreams;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.stream.JobStreams;
import org.kie.kogito.jobs.service.stream.JobEventPublisher;
import org.kie.kogito.jobs.service.utils.DateUtil;

import io.quarkus.arc.DefaultBean;
Expand All @@ -53,7 +53,7 @@ public InMemoryJobRepository() {
}

@Inject
public InMemoryJobRepository(Vertx vertx, JobStreams jobStreams) {
public InMemoryJobRepository(Vertx vertx, JobEventPublisher jobStreams) {
super(vertx, jobStreams);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,21 @@

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionStage;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.kie.kogito.jobs.service.executor.JobExecutorResolver;
import org.kie.kogito.jobs.service.job.DelegateJob;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobDetailsContext;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.kie.kogito.jobs.service.model.ManageableJobHandle;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.scheduler.BaseTimerJobScheduler;
import org.kie.kogito.jobs.service.stream.AvailableStreams;
import org.kie.kogito.jobs.service.stream.JobStreams;
import org.kie.kogito.jobs.service.utils.ErrorHandling;
import org.kie.kogito.jobs.service.stream.JobEventPublisher;
import org.kie.kogito.timer.Trigger;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
Expand All @@ -58,7 +52,7 @@ public class TimerDelegateJobScheduler extends BaseTimerJobScheduler {

private VertxTimerServiceScheduler delegate;

private JobStreams jobStreams;
private JobEventPublisher jobStreams;

protected TimerDelegateJobScheduler() {
}
Expand All @@ -70,7 +64,7 @@ public TimerDelegateJobScheduler(ReactiveJobRepository jobRepository,
@ConfigProperty(name = "kogito.jobs-service.schedulerChunkInMinutes", defaultValue = "10") long schedulerChunkInMinutes,
@ConfigProperty(name = "kogito.jobs-service.forceExecuteExpiredJobs", defaultValue = "true") boolean forceExecuteExpiredJobs,
JobExecutorResolver jobExecutorResolver, VertxTimerServiceScheduler delegate,
JobStreams jobStreams) {
JobEventPublisher jobStreams) {
super(jobRepository, backoffRetryMillis, maxIntervalLimitToRetryMillis, schedulerChunkInMinutes, forceExecuteExpiredJobs);
this.jobExecutorResolver = jobExecutorResolver;
this.delegate = delegate;
Expand Down Expand Up @@ -100,33 +94,4 @@ public Publisher<ManageableJobHandle> doCancel(JobDetails scheduledJob) {
.buildRs();
}

//Stream Processors

@Incoming(AvailableStreams.JOB_ERROR_EVENTS)
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public CompletionStage<Boolean> jobErrorProcessor(JobExecutionResponse response) {
LOGGER.warn("Error received {}", response);
return ErrorHandling.skipErrorPublisherBuilder(this::handleJobExecutionError, response)
.findFirst()
.run()
.thenApply(Optional::isPresent)
.exceptionally(e -> {
LOGGER.error("Error handling error {}", response, e);
return false;
});
}

@Incoming(AvailableStreams.JOB_SUCCESS_EVENTS)
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public CompletionStage<Boolean> jobSuccessProcessor(JobExecutionResponse response) {
LOGGER.debug("Success received to be processed {}", response);
return ErrorHandling.skipErrorPublisherBuilder(this::handleJobExecutionSuccess, response)
.findFirst()
.run()
.thenApply(Optional::isPresent)
.exceptionally(e -> {
LOGGER.error("Error handling error {}", response, e);
return false;
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.jobs.service.stream;

import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;

public interface JobEventPublisher {

JobExecutionResponse publishJobError(JobExecutionResponse response);

JobExecutionResponse publishJobSuccess(JobExecutionResponse response);

JobDetails publishJobStatusChange(JobDetails scheduledJob);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.kie.kogito.jobs.service.stream;

import java.util.Optional;
import java.util.concurrent.CompletionStage;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

Expand All @@ -29,6 +32,8 @@
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler;
import org.kie.kogito.jobs.service.utils.ErrorHandling;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,9 +44,12 @@
* received item.
*/
@ApplicationScoped
public class JobStreams {
public class JobStreamsEventPublisher implements JobEventPublisher {

private static final Logger LOGGER = LoggerFactory.getLogger(JobStreamsEventPublisher.class);

private static final Logger LOGGER = LoggerFactory.getLogger(JobStreams.class);
@Inject
ReactiveJobScheduler scheduler;

/**
* Publish on Stream of Job Error events
Expand Down Expand Up @@ -82,8 +90,36 @@ public JobDetails publishJobStatusChange(JobDetails scheduledJob) {
return scheduledJob;
}

//Broadcast Events from Emitter to Streams
//Stream Processors
@Incoming(AvailableStreams.JOB_ERROR_EVENTS)
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public CompletionStage<Boolean> jobErrorProcessor(JobExecutionResponse response) {
LOGGER.warn("Error received {}", response);
return ErrorHandling.skipErrorPublisherBuilder(scheduler::handleJobExecutionError, response)
.findFirst()
.run()
.thenApply(Optional::isPresent)
.exceptionally(e -> {
LOGGER.error("Error handling error {}", response, e);
return false;
});
}

@Incoming(AvailableStreams.JOB_SUCCESS_EVENTS)
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public CompletionStage<Boolean> jobSuccessProcessor(JobExecutionResponse response) {
LOGGER.debug("Success received to be processed {}", response);
return ErrorHandling.skipErrorPublisherBuilder(scheduler::handleJobExecutionSuccess, response)
.findFirst()
.run()
.thenApply(Optional::isPresent)
.exceptionally(e -> {
LOGGER.error("Error handling error {}", response, e);
return false;
});
}

// Broadcast Events from Emitter to Streams
@Incoming(AvailableStreams.JOB_ERROR)
@Outgoing(AvailableStreams.JOB_ERROR_EVENTS)
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.kie.kogito.jobs.service.model.Recipient;
import org.kie.kogito.jobs.service.model.RecipientInstance;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.stream.JobStreams;
import org.kie.kogito.jobs.service.stream.JobEventPublisher;
import org.kie.kogito.jobs.service.utils.DateUtil;
import org.kie.kogito.jobs.service.utils.FunctionsUtil;
import org.kie.kogito.timer.impl.PointInTimeTrigger;
Expand All @@ -56,8 +56,8 @@ public void setUp() throws Exception {
createAndSaveJob(ID);
}

public JobStreams mockJobStreams() {
final JobStreams mock = mock(JobStreams.class);
public JobEventPublisher mockJobStreams() {
final JobEventPublisher mock = mock(JobEventPublisher.class);
lenient().when(mock.publishJobStatusChange(any(JobDetails.class))).thenAnswer(a -> a.getArgument(0));
lenient().when(mock.publishJobSuccess(any(JobExecutionResponse.class))).thenAnswer(a -> a.getArgument(0));
lenient().when(mock.publishJobError(any(JobExecutionResponse.class))).thenAnswer(a -> a.getArgument(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.UUID;

import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -45,10 +44,8 @@

import io.smallrye.mutiny.Multi;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -98,42 +95,6 @@ void testDoCancelNullId() {
verify(timer, never()).removeJob(any(ManageableJobHandle.class));
}

@Test
void testJobSuccessProcessor() {
JobExecutionResponse response = getJobResponse();
doReturn(ReactiveStreams.of(JobDetails.builder().build()))
.when(tested).handleJobExecutionSuccess(response);
tested.jobSuccessProcessor(response).thenAccept(r -> assertThat(r).isTrue());
verify(tested).handleJobExecutionSuccess(response);
}

@Test
void testJobSuccessProcessorFail() {
JobExecutionResponse response = getJobResponse();
doReturn(ReactiveStreams.failed(new RuntimeException()))
.when(tested).handleJobExecutionSuccess(response);
tested.jobSuccessProcessor(response).thenAccept(r -> assertThat(r).isFalse());
verify(tested).handleJobExecutionSuccess(response);
}

@Test
void testJobErrorProcessor() {
JobExecutionResponse response = getJobResponse();
doReturn(ReactiveStreams.of(JobDetails.builder().build()))
.when(tested).handleJobExecutionError(response);
tested.jobErrorProcessor(response).thenAccept(r -> assertThat(r).isTrue());
verify(tested).handleJobExecutionError(response);
}

@Test
void testJobErrorProcessorFail() {
JobExecutionResponse response = getJobResponse();
doReturn(ReactiveStreams.failed(new RuntimeException()))
.when(tested).handleJobExecutionError(response);
tested.jobErrorProcessor(response).thenAccept(r -> assertThat(r).isFalse());
verify(tested).handleJobExecutionError(response);
}

private JobExecutionResponse getJobResponse() {
return JobExecutionResponse.builder()
.jobId(UUID.randomUUID().toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobDetailsContext;
import org.kie.kogito.jobs.service.model.ManageableJobHandle;
import org.kie.kogito.jobs.service.stream.JobStreams;
import org.kie.kogito.jobs.service.stream.JobEventPublisher;
import org.kie.kogito.jobs.service.utils.DateUtil;
import org.kie.kogito.timer.Job;
import org.kie.kogito.timer.JobContext;
Expand Down Expand Up @@ -62,7 +62,7 @@ class VertxTimerServiceSchedulerTest {
private JobExecutorResolver jobExecutorResolver;

@Mock
private JobStreams jobStreams;
private JobEventPublisher jobStreams;

@Captor
private ArgumentCaptor<JobDetails> jobCaptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.repository.impl.BaseReactiveJobRepository;
import org.kie.kogito.jobs.service.stream.JobStreams;
import org.kie.kogito.jobs.service.stream.JobEventPublisher;

import io.vertx.core.Vertx;

Expand All @@ -58,7 +58,7 @@ public class InfinispanJobRepository extends BaseReactiveJobRepository implement

@Inject
public InfinispanJobRepository(Vertx vertx,
JobStreams jobStreams,
JobEventPublisher jobStreams,
RemoteCacheManager remoteCacheManager) {
super(vertx, jobStreams);
this.remoteCacheManager = remoteCacheManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.repository.impl.BaseReactiveJobRepository;
import org.kie.kogito.jobs.service.repository.marshaller.JobDetailsMarshaller;
import org.kie.kogito.jobs.service.stream.JobStreams;
import org.kie.kogito.jobs.service.stream.JobStreamsEventPublisher;

import com.mongodb.client.model.FindOneAndReplaceOptions;

Expand Down Expand Up @@ -86,7 +86,7 @@ public class MongoDBJobRepository extends BaseReactiveJobRepository implements R
}

@Inject
public MongoDBJobRepository(Vertx vertx, JobStreams jobStreams, ReactiveMongoClient mongoClient,
public MongoDBJobRepository(Vertx vertx, JobStreamsEventPublisher jobStreams, ReactiveMongoClient mongoClient,
@ConfigProperty(name = DATABASE_PROPERTY) String database,
JobDetailsMarshaller jobDetailsMarshaller) {
super(vertx, jobStreams);
Expand Down
Loading

0 comments on commit 22c6248

Please sign in to comment.