diff --git a/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java b/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java index 6f1978b6..e363e307 100644 --- a/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java +++ b/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java @@ -6,6 +6,7 @@ import akka.stream.Materializer; import akka.stream.OverflowStrategy; import akka.stream.javadsl.*; +import fr.maif.concurrent.CompletionStages; import fr.maif.eventsourcing.Event; import fr.maif.eventsourcing.EventEnvelope; import fr.maif.eventsourcing.EventPublisher; @@ -58,7 +59,7 @@ public static InMemoryEventStore lastPublishedSequence() { - return CompletableFuture.completedStage(eventStore.stream().filter(e -> e.published).map(e -> e.sequenceNum) + return CompletionStages.completedStage(eventStore.stream().filter(e -> e.published).map(e -> e.sequenceNum) .max(Comparator.comparingLong(e -> e)) .orElse(0L)); } @@ -75,7 +76,7 @@ public CompletionStage> markAsPublished(Tuple0 t @Override public CompletionStage openTransaction() { - return CompletableFuture.completedStage(Tuple.empty()); + return CompletionStages.empty(); } @Override @@ -85,14 +86,14 @@ public CompletionStage commitOrRollback(Option of, Tuple0 tx) @Override public CompletionStage> markAsPublished(EventEnvelope eventEnvelope) { - return CompletableFuture.completedStage( + return CompletionStages.completedStage( eventEnvelope.copy().withPublished(true).build() ); } @Override public CompletionStage nextSequence(Tuple0 tx) { - return CompletableFuture.completedStage(sequence_num.incrementAndGet()); + return CompletionStages.completedStage(sequence_num.incrementAndGet()); } @Override diff --git a/thoth-core-akka/src/test/java/fr/maif/Helpers.java b/thoth-core-akka/src/test/java/fr/maif/Helpers.java index bdb65927..aefbc268 100644 --- a/thoth-core-akka/src/test/java/fr/maif/Helpers.java +++ b/thoth-core-akka/src/test/java/fr/maif/Helpers.java @@ -254,7 +254,7 @@ public static class VikingCommandHandler implements CommandHandler>> handleCommand(Tuple0 unit, Option state, VikingCommand vikingCommand) { - return CompletableFuture.completedStage( + return CompletionStages.completedStage( Match(vikingCommand).of( Case(VikingCommand.CreateVikingV1.pattern(), e -> events("C", new VikingEvent.VikingCreated(e.id, e.name))), Case(VikingCommand.UpdateVikingV1.pattern(), e -> events("U", new VikingEvent.VikingUpdated(e.id, e.name))), @@ -307,12 +307,12 @@ public static class VikingSnapshot implements SnapshotStore> getSnapshot(String entityId) { - return CompletableFuture.completedStage(Option.of(data.get(entityId))); + return CompletionStages.completedStage(Option.of(data.get(entityId))); } @Override public CompletionStage> getSnapshot(Tuple0 transactionContext, String entityId) { - return CompletableFuture.completedStage(Option.of(data.get(entityId))); + return CompletionStages.completedStage(Option.of(data.get(entityId))); } @Override @@ -321,7 +321,7 @@ public CompletionStage persist(Tuple0 transactionContext, String id, Opt Case($Some($()), s -> data.put(s.id, s)), Case($None(), () -> data.remove(id)) ); - return CompletableFuture.completedStage(Tuple.empty()); + return CompletionStages.completedStage(Tuple.empty()); } } diff --git a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java index 272df479..fa4f6b27 100644 --- a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java +++ b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java @@ -32,7 +32,7 @@ public class InMemoryEventStore implements Event private final Supplier> markAsPublishedTx; private final Supplier> markAsPublished; - private final static Supplier> NOOP = () -> CompletableFuture.completedStage(Tuple.empty()); + private final static Supplier> NOOP = () -> CompletionStages.completedStage(Tuple.empty()); public InMemoryEventStore(Supplier> markAsPublishedTx, Supplier> markAsPublished, @@ -84,26 +84,26 @@ public CompletionStage lastPublishedSequence() { max.accumulateAndGet(k.sequenceNum, Math::max); } }); - return CompletableFuture.completedStage(max.get()); + return CompletionStages.completedStage(max.get()); } @Override public CompletionStage> openTransaction() { - return CompletableFuture.completedStage(new Transaction<>()); + return CompletionStages.completedStage(new Transaction<>()); } @Override public CompletionStage> markAsPublished(Transaction tx, EventEnvelope eventEnvelope) { return markAsPublishedTx.get().thenCompose(any -> { tx.toPublish().add(eventEnvelope); - return CompletableFuture.completedStage(eventEnvelope); + return CompletionStages.completedStage(eventEnvelope); }); } @Override public CompletionStage> markAsPublished(EventEnvelope eventEnvelope) { return markAsPublished.get().thenCompose(any -> - CompletableFuture.completedStage(store.compute(eventEnvelope.sequenceNum, (k, event) -> { + CompletionStages.completedStage(store.compute(eventEnvelope.sequenceNum, (k, event) -> { if (event == null) { return eventEnvelope.copy().withPublished(true).build(); } else { @@ -115,7 +115,7 @@ public CompletionStage> markAsPublished(EventEnv @Override public CompletionStage persist(Transaction transactionContext, List> events) { - return CompletableFuture.completedStage(transactionContext.addAll(events.toJavaList())); + return CompletionStages.completedStage(transactionContext.addAll(events.toJavaList())); } @Override @@ -130,20 +130,20 @@ public CompletionStage commitOrRollback(Option of, Transactio } tx.events.clear(); tx.toPublish.clear(); - return CompletableFuture.completedStage(API.Tuple()); + return CompletionStages.completedStage(API.Tuple()); } @Override public CompletionStage nextSequence(InMemoryEventStore.Transaction tx) { long value = store.values().stream().map(e -> e.sequenceNum).max(Comparator.comparingLong(e -> e)).orElse(0L) + 1; sequenceNums.incrementAndGet(); - return CompletableFuture.completedStage(sequenceNums.accumulateAndGet(value, Math::max)); + return CompletionStages.completedStage(sequenceNums.accumulateAndGet(value, Math::max)); } @Override public CompletionStage publish(List> events) { events.forEach(e -> store.put(e.sequenceNum, e)); - return CompletableFuture.completedStage(API.Tuple()); + return CompletionStages.completedStage(API.Tuple()); } @Override diff --git a/thoth-core-reactor/src/test/java/fr/maif/Helpers.java b/thoth-core-reactor/src/test/java/fr/maif/Helpers.java index 8f6e59fa..5a6b8120 100644 --- a/thoth-core-reactor/src/test/java/fr/maif/Helpers.java +++ b/thoth-core-reactor/src/test/java/fr/maif/Helpers.java @@ -307,7 +307,7 @@ public static class VikingCommandHandler implements CommandHandler>> handleCommand(Transaction unit, Option state, VikingCommand vikingCommand) { - return CompletableFuture.completedStage( + return CompletionStages.completedStage( Match(vikingCommand).of( Case(VikingCommand.CreateVikingV1.pattern(), e -> events("C", new VikingEvent.VikingCreated(e.id, e.name, e.age))), Case(VikingCommand.UpdateVikingV1.pattern(), e -> events("U", new VikingEvent.VikingUpdated(e.id, e.name, e.age))), @@ -364,12 +364,12 @@ public static class VikingSnapshot implements SnapshotStore> getSnapshot(String entityId) { - return CompletableFuture.completedStage(Option.of(data.get(entityId))); + return CompletionStages.completedStage(Option.of(data.get(entityId))); } @Override public CompletionStage> getSnapshot(Transaction transactionContext, String entityId) { - return CompletableFuture.completedStage(Option.of(data.get(entityId))); + return CompletionStages.completedStage(Option.of(data.get(entityId))); } @Override @@ -378,7 +378,7 @@ public CompletionStage persist(Transaction Case($Some($()), s -> data.put(s.id, s)), Case($None(), () -> data.remove(id)) ); - return CompletableFuture.completedStage(Tuple.empty()); + return CompletionStages.completedStage(Tuple.empty()); } } diff --git a/thoth-core/src/main/java/fr/maif/concurrent/CompletionStages.java b/thoth-core/src/main/java/fr/maif/concurrent/CompletionStages.java index f44df6f0..9d2e542b 100644 --- a/thoth-core/src/main/java/fr/maif/concurrent/CompletionStages.java +++ b/thoth-core/src/main/java/fr/maif/concurrent/CompletionStages.java @@ -15,9 +15,20 @@ public class CompletionStages { + public static CompletionStage completedStage(U value) { + CompletableFuture completableFuture = new CompletableFuture<>(); + completableFuture.complete(value); + return completableFuture; + } + public static CompletionStage failedStage(Throwable e) { + CompletableFuture completableFuture = new CompletableFuture<>(); + completableFuture.completeExceptionally(e); + return completableFuture; + } + public static CompletionStage> traverse(List elements, Function> handler) { return elements.foldLeft( - CompletableFuture.completedStage(List.empty()), + completedStage(List.empty()), (fResult, elt) -> fResult.thenCompose(listResult -> handler.apply(elt).thenApply(listResult::append)) ); @@ -25,44 +36,44 @@ public static CompletionStage> traverse(List elements, Functio public static CompletionStage fromTry(Supplier> tryValue, Executor executor) { return CompletableFuture.supplyAsync(() -> tryValue.get().fold( - CompletableFuture::failedStage, - CompletableFuture::completedStage + CompletionStages::failedStage, + CompletionStages::completedStage ), executor).thenCompose(identity()); } public static CompletionStage fromTry(Supplier> tryValue) { return CompletableFuture.supplyAsync(() -> tryValue.get().fold( - CompletableFuture::failedStage, - CompletableFuture::completedStage + CompletionStages::failedStage, + CompletionStages::completedStage )).thenCompose(identity()); } public static CompletionStage of(Supplier tryValue, Executor executor) { return CompletableFuture.supplyAsync(() -> { try { - return CompletableFuture.completedStage(tryValue.get()); + return completedStage(tryValue.get()); } catch (Exception e) { - return CompletableFuture.failedStage(e); + return CompletionStages.failedStage(e); } }, executor).thenCompose(identity()); } public static CompletionStage of(Supplier tryValue) { return CompletableFuture.supplyAsync(() -> { try { - return CompletableFuture.completedStage(tryValue.get()); + return completedStage(tryValue.get()); } catch (Exception e) { - return CompletableFuture.failedStage(e); + return CompletionStages.failedStage(e); } }).thenCompose(identity()); } public static CompletionStage successful(S value) { - return CompletableFuture.completedStage(value); + return completedStage(value); } public static CompletionStage failed(Throwable e) { - return CompletableFuture.failedStage(e); + return CompletionStages.failedStage(e); } public static CompletionStage empty() { - return CompletableFuture.completedStage(Tuple.empty()); + return completedStage(Tuple.empty()); } } diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/AggregateStore.java b/thoth-core/src/main/java/fr/maif/eventsourcing/AggregateStore.java index ad3aec43..cfcf7045 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/AggregateStore.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/AggregateStore.java @@ -1,12 +1,11 @@ package fr.maif.eventsourcing; +import fr.maif.concurrent.CompletionStages; import io.vavr.Tuple; import io.vavr.Tuple0; import io.vavr.collection.List; -import io.vavr.concurrent.Future; import io.vavr.control.Option; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; public interface AggregateStore, Id, TxCtx> { @@ -16,11 +15,11 @@ public interface AggregateStore, Id, TxCtx> { CompletionStage> getAggregate(TxCtx ctx, Id entityId); default CompletionStage storeSnapshot(TxCtx transactionContext, Id id, Option state) { - return CompletableFuture.completedStage(Tuple.empty()); + return CompletionStages.completedStage(Tuple.empty()); } default CompletionStage> getSnapshot(TxCtx transactionContext, Id id) { - return CompletableFuture.completedStage(Option.none()); + return CompletionStages.completedStage(Option.none()); } default CompletionStage> buildAggregateAndStoreSnapshot(TxCtx ctx, EventHandler eventHandler, Option state, Id id, List events, Option lastSequenceNum) { diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/CommandHandler.java b/thoth-core/src/main/java/fr/maif/eventsourcing/CommandHandler.java index 4c01144f..742bee88 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/CommandHandler.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/CommandHandler.java @@ -1,13 +1,11 @@ package fr.maif.eventsourcing; +import fr.maif.concurrent.CompletionStages; import io.vavr.Tuple0; -import io.vavr.Tuple2; import io.vavr.collection.List; -import io.vavr.concurrent.Future; import io.vavr.control.Either; import io.vavr.control.Option; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; /** @@ -33,11 +31,11 @@ public interface CommandHandler>> handleCommand(TxCtx ctx, Option state, Command command); default CompletionStage>> eventsAsync(E... events) { - return CompletableFuture.completedStage(Either.right(Events.events(List.of(events)))); + return CompletionStages.completedStage(Either.right(Events.events(List.of(events)))); } default CompletionStage>> eventsAsync(Message message, E... events) { - return CompletableFuture.completedStage(Either.right(Events.events(message, List.of(events)))); + return CompletionStages.completedStage(Either.right(Events.events(message, List.of(events)))); } default Either> events(E... events) { @@ -53,7 +51,7 @@ default Either> fail(Error error) { } default CompletionStage>> failAsync(Error error) { - return CompletableFuture.completedStage(Either.left(error)); + return CompletionStages.completedStage(Either.left(error)); } } diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/EventProcessorImpl.java b/thoth-core/src/main/java/fr/maif/eventsourcing/EventProcessorImpl.java index d0eacc6a..7611d075 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/EventProcessorImpl.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/EventProcessorImpl.java @@ -156,7 +156,7 @@ public CompletionStage, Either>>>> traverseCommands(List elements, BiFunction, CompletionStage, Either>>>> handler) { return elements.foldLeft( - CompletableFuture.completedStage(Tuple(List., Either>>>empty(), List.>empty())), + CompletionStages.completedStage(Tuple(List., Either>>>empty(), List.>empty())), (fResult, elt) -> fResult.thenCompose(listResult -> handler.apply(elt, listResult._2.flatMap(e -> e.events)) .thenApply(r -> diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/impl/JdbcTransactionManager.java b/thoth-core/src/main/java/fr/maif/eventsourcing/impl/JdbcTransactionManager.java index 1bcb3b76..ed1666bf 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/impl/JdbcTransactionManager.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/impl/JdbcTransactionManager.java @@ -49,12 +49,12 @@ public CompletionStage withTransaction(Function commit(connection).thenApply(__ -> r)) .exceptionallyCompose(e -> { LOGGER.error("Error, rollbacking, {}", e); - return rollback(connection).thenCompose(__ -> CompletableFuture.failedStage(e)); + return rollback(connection).thenCompose(__ -> CompletionStages.failedStage(e)); }) .thenCompose(r -> closeConnection(connection).thenApply(__ -> r)) .exceptionallyCompose(e -> { LOGGER.error("Error, closing connection, {}", e); - return closeConnection(connection).thenCompose(__ -> CompletableFuture.failedStage(e)); + return closeConnection(connection).thenCompose(__ -> CompletionStages.failedStage(e)); }) ); } diff --git a/thoth-core/src/test/java/fr/maif/Helpers.java b/thoth-core/src/test/java/fr/maif/Helpers.java index 20f1e5ed..5b6eb76c 100644 --- a/thoth-core/src/test/java/fr/maif/Helpers.java +++ b/thoth-core/src/test/java/fr/maif/Helpers.java @@ -240,7 +240,7 @@ public static class VikingCommandHandler implements CommandHandler>> handleCommand(Tuple0 unit, Option state, VikingCommand vikingCommand) { - return CompletableFuture.completedStage( + return CompletionStages.completedStage( Match(vikingCommand).of( Case(VikingCommand.CreateVikingV1.pattern(), e -> events("C", new VikingEvent.VikingCreated(e.id, e.name))), Case(VikingCommand.UpdateVikingV1.pattern(), e -> events("U", new VikingEvent.VikingUpdated(e.id, e.name))), @@ -293,12 +293,12 @@ public static class VikingSnapshot implements SnapshotStore> getSnapshot(String entityId) { - return CompletableFuture.completedStage(Option.of(data.get(entityId))); + return CompletionStages.completedStage(Option.of(data.get(entityId))); } @Override public CompletionStage> getSnapshot(Tuple0 transactionContext, String entityId) { - return CompletableFuture.completedStage(Option.of(data.get(entityId))); + return CompletionStages.completedStage(Option.of(data.get(entityId))); } @Override @@ -307,7 +307,7 @@ public CompletionStage persist(Tuple0 transactionContext, String id, Opt Case($Some($()), s -> data.put(s.id, s)), Case($None(), () -> data.remove(id)) ); - return CompletableFuture.completedStage(Tuple.empty()); + return CompletionStages.completedStage(Tuple.empty()); } } diff --git a/thoth-kafka-consumer-akka/src/main/java/fr/maif/kafka/consumer/ResilientKafkaConsumer.java b/thoth-kafka-consumer-akka/src/main/java/fr/maif/kafka/consumer/ResilientKafkaConsumer.java index 6be6c203..4d41ce8a 100644 --- a/thoth-kafka-consumer-akka/src/main/java/fr/maif/kafka/consumer/ResilientKafkaConsumer.java +++ b/thoth-kafka-consumer-akka/src/main/java/fr/maif/kafka/consumer/ResilientKafkaConsumer.java @@ -254,14 +254,27 @@ public ResilientKafkaConsumer(ActorSystem actorSystem, Config config) { this.consumerSettings = config.consumerSettings .withGroupId(config.groupId) .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - this.onStarted = defaultIfNull(config.onStarted, (__, ___) -> CompletableFuture.completedFuture(done())); - this.onStarting = defaultIfNull(config.onStarting, () -> CompletableFuture.completedFuture(done())); - this.onStopped = defaultIfNull(config.onStopped, () -> CompletableFuture.completedFuture(done())); - this.onStopping = defaultIfNull(config.onStopping, (__) -> CompletableFuture.completedFuture(done())); - this.onFailed = defaultIfNull(config.onFailed, (__) -> CompletableFuture.completedFuture(done())); + this.onStarted = defaultIfNull(config.onStarted, (__, ___) -> completedStage(done())); + this.onStarting = defaultIfNull(config.onStarting, () -> completedStage(done())); + this.onStopped = defaultIfNull(config.onStopped, () -> completedStage(done())); + this.onStopping = defaultIfNull(config.onStopping, (__) -> completedStage(done())); + this.onFailed = defaultIfNull(config.onFailed, (__) -> completedStage(done())); this.start(); } + + public static CompletionStage completedStage(U value) { + CompletableFuture completableFuture = new CompletableFuture<>(); + completableFuture.complete(value); + return completableFuture; + } + + public static CompletionStage failedStage(Throwable e) { + CompletableFuture completableFuture = new CompletableFuture<>(); + completableFuture.completeExceptionally(e); + return completableFuture; + } + public static ResilientKafkaConsumer createFromFlow(ActorSystem actorSystem, String name, Config config, @@ -487,7 +500,7 @@ protected CompletionStage stopConsumingKafka() { .thenCompose(___ -> control.isShutdown()) .thenApply(__ -> done()); } else { - return CompletableFuture.completedFuture(done()); + return completedStage(done()); } } } diff --git a/thoth-kafka-consumer-akka/src/test/java/fr/maif/kafka/consumer/ResilientKafkaConsumerTest.java b/thoth-kafka-consumer-akka/src/test/java/fr/maif/kafka/consumer/ResilientKafkaConsumerTest.java index 9318bbcf..469fd4ec 100644 --- a/thoth-kafka-consumer-akka/src/test/java/fr/maif/kafka/consumer/ResilientKafkaConsumerTest.java +++ b/thoth-kafka-consumer-akka/src/test/java/fr/maif/kafka/consumer/ResilientKafkaConsumerTest.java @@ -146,7 +146,7 @@ void crash() throws Exception { return completableFuture; } else { names.set(names.get() + " " + messageAndIndex.first().record().value()); - return CompletableFuture.completedFuture(messageAndIndex.first().committableOffset()); + return CompletableFuture.completedStage(messageAndIndex.first().committableOffset()); } }) );