diff --git a/.sdkmanrc b/.sdkmanrc new file mode 100644 index 00000000..49f2c8cd --- /dev/null +++ b/.sdkmanrc @@ -0,0 +1,3 @@ +# Enable auto-env through the sdkman_auto_env config +# Add key=value pairs of SDKs to use below +java=17.0.7-tem diff --git a/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java b/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java index 0b65efe3..b171df5c 100644 --- a/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java +++ b/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java @@ -8,6 +8,7 @@ import akka.stream.javadsl.*; import fr.maif.eventsourcing.Event; import fr.maif.eventsourcing.EventEnvelope; +import fr.maif.eventsourcing.EventPublisher; import fr.maif.eventsourcing.EventStore; import io.vavr.Tuple; import io.vavr.Tuple0; @@ -15,6 +16,7 @@ import io.vavr.control.Option; import org.reactivestreams.Publisher; +import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -121,4 +123,20 @@ public CompletionStage persist(Tuple0 transactionContext, List eventPublisher() { + var _this = this; + return new EventPublisher() { + @Override + public CompletionStage publish(List> events) { + return _this.publish(events); + } + + @Override + public void close() throws IOException { + + } + }; + } } diff --git a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java index ec64b873..f170b986 100644 --- a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java +++ b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java @@ -3,6 +3,7 @@ import fr.maif.concurrent.CompletionStages; import fr.maif.eventsourcing.Event; import fr.maif.eventsourcing.EventEnvelope; +import fr.maif.eventsourcing.EventPublisher; import fr.maif.eventsourcing.EventStore; import io.vavr.Tuple; import io.vavr.Tuple0; @@ -12,6 +13,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; +import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -103,4 +105,20 @@ public CompletionStage persist(Tuple0 transactionContext, List eventPublisher() { + var _this = this; + return new EventPublisher() { + @Override + public CompletionStage publish(List> events) { + return _this.publish(events); + } + + @Override + public void close() throws IOException { + + } + }; + } } diff --git a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java index e7ae34f5..dba145e0 100644 --- a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java +++ b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java @@ -13,6 +13,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; +import reactor.core.Scannable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; @@ -57,8 +58,8 @@ public ReactorKafkaEventPublisher(SenderOptions> senderOptions, String topic, Integer queueBufferSize, Duration restartInterval, Duration maxRestartInterval) { this.topic = topic; int queueBufferSize1 = queueBufferSize == null ? 10000 : queueBufferSize; - this.restartInterval = restartInterval == null ? Duration.of(10, ChronoUnit.SECONDS) : restartInterval; - this.maxRestartInterval = maxRestartInterval == null ? Duration.of(30, ChronoUnit.MINUTES) : maxRestartInterval; + this.restartInterval = restartInterval == null ? Duration.of(1, ChronoUnit.SECONDS) : restartInterval; + this.maxRestartInterval = maxRestartInterval == null ? Duration.of(1, ChronoUnit.MINUTES) : maxRestartInterval; EventEnvelope e = EventEnvelope.builder().build(); @@ -105,18 +106,26 @@ public void start(EventStore eventStore, Concur .doOnComplete(() -> LOGGER.info("Closing publishing to {}", topic)) .retryWhen(Retry.backoff(Long.MAX_VALUE, restartInterval) .transientErrors(true) - .maxBackoff(maxRestartInterval)) + .maxBackoff(maxRestartInterval) + .doBeforeRetry(ctx -> { + LOGGER.error("Error handling events for topic %s retrying for the %s time".formatted(topic, ctx.totalRetries()), ctx.failure()); + }) + ) .subscribe(); } private Function>, Flux>> publishToKafka(EventStore eventStore, Option tx, Function>>, Flux>>>> groupFlow) { - Function, EventEnvelope>>, Flux>>> publishToKafkaFlow = it -> kafkaSender.send(it); + Function, EventEnvelope>>, Flux>>> publishToKafkaFlow = it -> + kafkaSender.send(it) + .doOnError(e -> { + LOGGER.error("Error publishing to kafka ", e); + }); return it -> it .map(this::toKafkaMessage) .transform(publishToKafkaFlow) .transform(groupFlow) - .flatMap(m -> + .concatMap(m -> tx.fold( () -> Mono.fromCompletionStage(() -> eventStore.markAsPublished(m.map(SenderResult::correlationMetadata))), txCtx -> Mono.fromCompletionStage(() -> eventStore.markAsPublished(txCtx, m.map(SenderResult::correlationMetadata))) @@ -130,7 +139,23 @@ public CompletionStage publish(List> eve LOGGER.debug("Publishing event in memory : \n{} ", events); return Flux .fromIterable(events) - .map(queue::tryEmitNext) + .concatMap(t -> + Mono.defer(() -> { + Sinks.EmitResult emitResult = queue.tryEmitNext(t); + if (emitResult.isFailure()) { + return Mono.error(new RuntimeException("Error publishing to queue for %s : %s".formatted(topic, emitResult))); + } else { + return Mono.just(""); + } + }) + .retryWhen(Retry + .backoff(5, Duration.ofMillis(500)) + .doBeforeRetry(ctx -> { + LOGGER.error("Error publishing to queue %s retrying for the %s time".formatted(topic, ctx.totalRetries()), ctx.failure()); + }) + ) + .onErrorReturn("") + ) .collectList() .thenReturn(Tuple.empty()) .toFuture(); @@ -139,7 +164,11 @@ public CompletionStage publish(List> eve @Override public void close() throws IOException { if (Objects.nonNull(killSwitch)) { - this.killSwitch.dispose(); + try { + this.killSwitch.dispose(); + } catch (UnsupportedOperationException e) { + LOGGER.error("Error closing Publisher", e); + } } this.kafkaSender.close(); } @@ -168,4 +197,8 @@ private Flux logProgress(Flux logProgress, int every) { }); } + public Integer getBufferedElementCount() { + return this.queue.scan(Scannable.Attr.BUFFERED); + } + } diff --git a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java index 91b37d30..96944eb0 100644 --- a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java +++ b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java @@ -30,12 +30,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.*; import org.mockito.Mockito; import org.reactivestreams.Publisher; import reactor.kafka.sender.SenderOptions; @@ -44,15 +39,10 @@ import java.time.Duration; import java.time.LocalDateTime; import java.time.temporal.ChronoUnit; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.StringJoiner; -import java.util.UUID; +import java.util.*; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -60,14 +50,7 @@ import static fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy.SKIP; import static io.vavr.API.println; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.atMost; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class KafkaEventPublisherTest extends BaseKafkaTest { @@ -80,7 +63,7 @@ public class KafkaEventPublisherTest extends BaseKafkaTest { } @BeforeEach - void cleanUpInit() throws ExecutionException, InterruptedException, TimeoutException { + void cleanUpInit() { setUpAdminClient(); try { Set topics = adminClient().listTopics().names().get(5, TimeUnit.SECONDS); diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java b/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java index 0a46d39e..b66df349 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java @@ -49,6 +49,8 @@ default CompletionStage>> markAsPublished(L CompletionStage commitOrRollback(Option of, TxCtx tx); + EventPublisher eventPublisher(); + /** * Strategy to choose when replaying journal in case of crash when there is two or more nodes that want to replay concurrently. *
    diff --git a/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java b/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java index aa410d9e..f6dd2794 100644 --- a/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java +++ b/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java @@ -390,4 +390,8 @@ public Class toType() { } } + @Override + public EventPublisher eventPublisher() { + return eventPublisher; + } } diff --git a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java index ce038118..5173af1a 100644 --- a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java +++ b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java @@ -411,4 +411,8 @@ public Class toType() { } } + @Override + public EventPublisher eventPublisher() { + return eventPublisher; + } } diff --git a/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java b/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java index 28462d10..8eac89a4 100644 --- a/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java +++ b/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java @@ -372,4 +372,8 @@ private Option readValue(String value) { .flatMap(str -> Try.of(() -> objectMapper.readTree(str)).toOption()); } + @Override + public EventPublisher eventPublisher() { + return eventPublisher; + } }