diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/AggregateStore.java b/thoth-core/src/main/java/fr/maif/eventsourcing/AggregateStore.java index ad3aec43..eb52cc14 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/AggregateStore.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/AggregateStore.java @@ -1,8 +1,10 @@ package fr.maif.eventsourcing; +import fr.maif.concurrent.CompletionStages; import io.vavr.Tuple; import io.vavr.Tuple0; import io.vavr.collection.List; +import io.vavr.collection.Map; import io.vavr.concurrent.Future; import io.vavr.control.Option; @@ -13,6 +15,8 @@ public interface AggregateStore, Id, TxCtx> { CompletionStage> getAggregate(Id entityId); + CompletionStage>> getAggregates(TxCtx ctx, List entityIds); + CompletionStage> getAggregate(TxCtx ctx, Id entityId); default CompletionStage storeSnapshot(TxCtx transactionContext, Id id, Option state) { @@ -23,6 +27,10 @@ default CompletionStage> getSnapshot(TxCtx transactionContext, Id id) return CompletableFuture.completedStage(Option.none()); } + default CompletionStage> getSnapshots(TxCtx transactionContext, List ids) { + return CompletableFuture.completedStage(List.empty()); + } + default CompletionStage> buildAggregateAndStoreSnapshot(TxCtx ctx, EventHandler eventHandler, Option state, Id id, List events, Option lastSequenceNum) { Option newState = eventHandler.deriveState(state, events.filter(event -> event.entityId().equals(id))); diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/EventProcessorImpl.java b/thoth-core/src/main/java/fr/maif/eventsourcing/EventProcessorImpl.java index d0eacc6a..d6b9459c 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/EventProcessorImpl.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/EventProcessorImpl.java @@ -9,6 +9,7 @@ import io.vavr.Tuple3; import io.vavr.Value; import io.vavr.collection.List; +import io.vavr.collection.Map; import io.vavr.collection.Seq; import io.vavr.control.Either; import io.vavr.control.Option; @@ -20,12 +21,14 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.function.Supplier; import static io.vavr.API.List; import static io.vavr.API.None; import static io.vavr.API.Tuple; import static fr.maif.concurrent.CompletionStages.traverse; +import static java.util.function.Function.identity; public class EventProcessorImpl, C extends Command, E extends Event, TxCtx, Message, Meta, Context> implements EventProcessor { @@ -68,90 +71,92 @@ public CompletionStage>>>> batchProcessCommand(TxCtx ctx, List commands) { // Collect all states from db - return traverseCommands(commands, (c, events) -> - this.getCurrentState(ctx, c, events).thenCompose(mayBeState -> - //handle command with state to get events - handleCommand(ctx, mayBeState, c) - // Return command + state + (error or events) - .thenApply(r -> Tuple(c, mayBeState, r)) - ) - ) - .thenCompose(commandsAndResults -> { - // Extract errors from command handling - List>> errors = commandsAndResults - .map(Tuple3::_3) - .filter(Either::isLeft) - .map(e -> Either.left(e.swap().get())); - - // Extract success and generate envelopes for each result - CompletionStage> success = traverse(commandsAndResults.filter(t -> t._3.isRight()), t -> { - C command = t._1; - Option mayBeState = t._2; - List events = t._3.get().events.toList(); - return buildEnvelopes(ctx, command, events).thenApply(eventEnvelopes -> { - Option mayBeLastSeqNum = eventEnvelopes.lastOption().map(evl -> evl.sequenceNum); - return new CommandStateAndEvent(command, mayBeState, eventEnvelopes, events, t._3.get().message, mayBeLastSeqNum); - }); - }); - - return success.thenApply(s -> Tuple(s.toList(), errors)); - }) - .thenCompose(successAndErrors -> { - - List>> errors = successAndErrors._2; - List success = successAndErrors._1; - - // Get all envelopes - List> envelopes = success.flatMap(CommandStateAndEvent::getEventEnvelopes); - - CompletionStage>> stored = eventStore - // Persist all envelopes - .persist(ctx, envelopes) - .thenCompose(__ -> - // Persist states - traverse(success, s -> { - LOGGER.debug("Storing state {} to DB", s); - List sequences = envelopes.filter(env -> env.entityId.equals(s.command.entityId().get())).map(env -> env.sequenceNum); - return aggregateStore - .buildAggregateAndStoreSnapshot( - ctx, - eventHandler, - s.getState(), - s.getCommand().entityId().get(), - s.getEvents(), - sequences.max() - ) - .thenApply(mayBeNextState -> - new ProcessingSuccess<>(s.state, mayBeNextState, s.getEventEnvelopes(), s.getMessage()) - ); - }) - ) - .thenCompose(mayBeNextState -> - // Apply events to projections - traverse(projections, p -> { - LOGGER.debug("Applying envelopes {} to projection", envelopes); - return p.storeProjection(ctx, envelopes); - }) - .thenApply(__ -> mayBeNextState) - ); - return stored.thenApply(results -> - errors.appendAll(results.map(Either::right)) - ); - }) - .thenApply(results -> { - Supplier> postTransactionProcess = () -> { - List> envelopes = results.flatMap(Value::toList).flatMap(ProcessingSuccess::getEvents); - LOGGER.debug("Publishing events {} to kafka", envelopes); - return eventStore.publish(envelopes) - .thenApply(__ -> Tuple.empty()) - .exceptionally(e -> Tuple.empty()); - }; - var inTransactionResult = new InTransactionResult<>( - results, - postTransactionProcess - ); - return inTransactionResult; - }); + return aggregateStore.getAggregates(ctx, commands.filter(c -> c.hasId()).map(c -> c.entityId().get())) + .thenCompose(states -> + traverseCommands(commands, (c, events) -> { + //handle command with state to get events + Option mayBeState = this.getCurrentState(ctx, states, c, events); + return handleCommand(ctx, mayBeState, c) + // Return command + state + (error or events) + .thenApply(r -> Tuple(c, mayBeState, r)); + }) + .thenCompose(commandsAndResults -> { + // Extract errors from command handling + List>> errors = commandsAndResults + .map(Tuple3::_3) + .filter(Either::isLeft) + .map(e -> Either.left(e.swap().get())); + + // Extract success and generate envelopes for each result + CompletionStage> success = traverse(commandsAndResults.filter(t -> t._3.isRight()), t -> { + C command = t._1; + Option mayBeState = t._2; + List events = t._3.get().events.toList(); + return buildEnvelopes(ctx, command, events).thenApply(eventEnvelopes -> { + Option mayBeLastSeqNum = eventEnvelopes.lastOption().map(evl -> evl.sequenceNum); + return new CommandStateAndEvent(command, mayBeState, eventEnvelopes, events, t._3.get().message, mayBeLastSeqNum); + }); + }); + + return success.thenApply(s -> Tuple(s.toList(), errors)); + }) + .thenCompose(successAndErrors -> { + + List>> errors = successAndErrors._2; + List success = successAndErrors._1; + + // Get all envelopes + List> envelopes = success.flatMap(CommandStateAndEvent::getEventEnvelopes); + + CompletionStage>> stored = eventStore + // Persist all envelopes + .persist(ctx, envelopes) + .thenCompose(__ -> + // Persist states + traverse(success, s -> { + LOGGER.debug("Storing state {} to DB", s); + List sequences = envelopes.filter(env -> env.entityId.equals(s.command.entityId().get())).map(env -> env.sequenceNum); + return aggregateStore + .buildAggregateAndStoreSnapshot( + ctx, + eventHandler, + s.getState(), + s.getCommand().entityId().get(), + s.getEvents(), + sequences.max() + ) + .thenApply(mayBeNextState -> + new ProcessingSuccess<>(s.state, mayBeNextState, s.getEventEnvelopes(), s.getMessage()) + ); + }) + ) + .thenCompose(mayBeNextState -> + // Apply events to projections + traverse(projections, p -> { + LOGGER.debug("Applying envelopes {} to projection", envelopes); + return p.storeProjection(ctx, envelopes); + }) + .thenApply(__ -> mayBeNextState) + ); + return stored.thenApply(results -> + errors.appendAll(results.map(Either::right)) + ); + }) + .thenApply(results -> { + Supplier> postTransactionProcess = () -> { + List> envelopes = results.flatMap(Value::toList).flatMap(ProcessingSuccess::getEvents); + LOGGER.debug("Publishing events {} to kafka", envelopes); + return eventStore.publish(envelopes) + .thenApply(__ -> Tuple.empty()) + .exceptionally(e -> Tuple.empty()); + }; + var inTransactionResult = new InTransactionResult<>( + results, + postTransactionProcess + ); + return inTransactionResult; + }) + ); } public CompletionStage, Either>>>> traverseCommands(List elements, BiFunction, CompletionStage, Either>>>> handler) { @@ -160,9 +165,9 @@ public CompletionStage, Either fResult.thenCompose(listResult -> handler.apply(elt, listResult._2.flatMap(e -> e.events)) .thenApply(r -> - Tuple( - listResult._1.append(r), - listResult._2.append(r._3.getOrElse(Events.empty()))) + Tuple( + listResult._1.append(r), + listResult._2.append(r._3.getOrElse(Events.empty()))) )) ).thenApply(t -> t._1); } @@ -179,15 +184,12 @@ private CompletionStage>> handleCommand(TxCtx t return commandHandler.handleCommand(txCtx, state, command); } - private CompletionStage> getCurrentState(TxCtx ctx, C command, List previousEvent) { + private Option getCurrentState(TxCtx ctx, Map> states, C command, List previousEvent) { if (command.hasId()) { String entityId = command.entityId().get(); - return aggregateStore.getAggregate(ctx, entityId) - .thenApply(state -> - eventHandler.deriveState(state, previousEvent.filter(e -> e.entityId().equals(entityId))) - ); + return eventHandler.deriveState(states.get(entityId).flatMap(identity()), previousEvent.filter(e -> e.entityId().equals(entityId))); } else { - return CompletionStages.successful(None()); + return None(); } } diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java b/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java index 93499169..31df40a0 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java @@ -2,6 +2,7 @@ import fr.maif.concurrent.CompletionStages; import io.vavr.Tuple0; +import io.vavr.Tuple2; import io.vavr.Value; import io.vavr.collection.List; import io.vavr.control.Option; @@ -76,6 +77,7 @@ class Query { public final Long sequenceFrom; public final Long sequenceTo; public final Boolean published; + public final List> idsAndSequences; private Query(Query.Builder builder) { this.dateFrom = builder.dateFrom; @@ -87,6 +89,7 @@ private Query(Query.Builder builder) { this.published = builder.published; this.sequenceFrom = builder.sequenceFrom; this.sequenceTo = builder.sequenceTo; + this.idsAndSequences = Objects.requireNonNullElse(builder.idsAndSequences, List.empty()); } public static Builder builder() { @@ -125,6 +128,10 @@ public Option sequenceTo() { return Option.of(sequenceTo); } + public List> idsAndSequences() { + return idsAndSequences; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -156,6 +163,7 @@ public static class Builder { Boolean published; Long sequenceFrom; Long sequenceTo; + List> idsAndSequences; public Builder withDateFrom(LocalDateTime dateFrom) { this.dateFrom = dateFrom; @@ -201,6 +209,10 @@ public Builder withSequenceTo(Long sequenceTo) { this.sequenceTo = sequenceTo; return this; } + public Builder withIdsAndSequences(List> idsAndSequences) { + this.idsAndSequences = idsAndSequences; + return this; + } public Query build() { return new Query(this); diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/State.java b/thoth-core/src/main/java/fr/maif/eventsourcing/State.java index 41d8bc66..f3b3be45 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/State.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/State.java @@ -2,6 +2,8 @@ public interface State { + String entityId(); + Long sequenceNum(); S withSequenceNum(Long sequenceNum); diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/impl/AbstractDefaultAggregateStore.java b/thoth-core/src/main/java/fr/maif/eventsourcing/impl/AbstractDefaultAggregateStore.java index 3f019f1a..3ec0020c 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/impl/AbstractDefaultAggregateStore.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/impl/AbstractDefaultAggregateStore.java @@ -7,12 +7,20 @@ import fr.maif.eventsourcing.EventStore; import fr.maif.eventsourcing.State; import fr.maif.eventsourcing.TransactionManager; +import io.vavr.Tuple; +import io.vavr.Tuple2; +import io.vavr.collection.HashMap; +import io.vavr.collection.List; +import io.vavr.collection.Map; +import io.vavr.collection.Traversable; import io.vavr.control.Option; import org.reactivestreams.Publisher; import java.util.concurrent.CompletionStage; import java.util.function.BiFunction; +import static java.util.function.Function.identity; + public abstract class AbstractDefaultAggregateStore, E extends Event, Meta, Context, TxCtx> implements AggregateStore { private final EventStore eventStore; @@ -31,6 +39,29 @@ public CompletionStage> getAggregate(String entityId) { } @Override + public CompletionStage>> getAggregates(TxCtx ctx, List entityIds) { + return this.getSnapshots(ctx, entityIds) + .thenCompose(snapshots -> { + Map indexed = snapshots.groupBy(State::entityId).mapValues(Traversable::head); + List> idsAndSeqNums = entityIds.map(id -> Tuple.of(id, indexed.get(id).map(s -> s.sequenceNum()).getOrElse(0L))); + Map> empty = HashMap.ofEntries(entityIds.map(id -> Tuple.of(id, indexed.get(id)))); + EventStore.Query query = EventStore.Query.builder().withIdsAndSequences(idsAndSeqNums).build(); + Publisher> events = this.eventStore.loadEventsByQuery(ctx, query); + return fold(events, + empty, + (Map> states, EventEnvelope event) -> { + Option mayBeCurrentState = states.get(event.entityId).flatMap(identity()); + return states.put( + event.entityId, + this.eventEventHandler + .applyEvent(mayBeCurrentState, event.event) + .map((S state) -> (S) state.withSequenceNum(event.sequenceNum)) + ); + } + ); + }); + } + public CompletionStage> getAggregate(TxCtx ctx, String entityId) { return this.getSnapshot(ctx, entityId) @@ -43,7 +74,8 @@ public CompletionStage> getAggregate(TxCtx ctx, String entityId) { s -> EventStore.Query.builder().withSequenceFrom(s.sequenceNum()).withEntityId(entityId).build() ); - return fold(this.eventStore.loadEventsByQuery(ctx, query), + Publisher> events = this.eventStore.loadEventsByQuery(ctx, query); + return fold(events, mayBeSnapshot, (Option mayBeState, EventEnvelope event) -> this.eventEventHandler.applyEvent(mayBeState, event.event)