Skip to content

Commit

Permalink
Batch load of aggregates (#67)
Browse files Browse the repository at this point in the history
* Optimisations
  • Loading branch information
larousso authored Jul 15, 2024
1 parent df3131a commit 3039c1a
Show file tree
Hide file tree
Showing 27 changed files with 525 additions and 140 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ subprojects {
reactorKafkaVersion = "1.3.22"
reactorVersion = "3.5.7"
vertxSqlVersion = "4.3.3"
testContainerVersion = "1.19.8"
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ public Long sequenceNum() {
return sequenceNum;
}

@Override
public String entityId() {
return id;
}

@Override
public Account withSequenceNum(Long sequenceNum) {
this.sequenceNum = sequenceNum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ public String getId() {
return id;
}

@Override
public String entityId() {
return id;
}

public BigDecimal getBalance() {
return balance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ public Long sequenceNum() {
return sequenceNum;
}

@Override
public String entityId() {
return id;
}

@Override
public Account withSequenceNum(Long sequenceNum) {
this.sequenceNum = sequenceNum;
Expand Down
5 changes: 5 additions & 0 deletions sample/src/main/java/fr/maif/thoth/sample/state/Account.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ public class Account extends AbstractState<Account> {
public BigDecimal balance;
public long sequenceNum;

@Override
public String entityId() {
return id;
}

public static class AccountBuilder{
String id;
BigDecimal balance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ public CompletionStage<Long> nextSequence(Tuple0 tx) {
return CompletionStages.completedStage(sequence_num.incrementAndGet());
}

@Override
public CompletionStage<List<Long>> nextSequences(Tuple0 tx, Integer count) {
return CompletionStages.completedStage(List.range(0, count).map(any -> sequence_num.incrementAndGet()));
}

@Override
public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> events) {
events.forEach(queue::offer);
Expand Down
5 changes: 5 additions & 0 deletions thoth-core-akka/src/test/java/fr/maif/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,11 @@ public Viking(String id, String name, Long sequenceNum) {
this.sequenceNum = sequenceNum;
}

@Override
public String entityId() {
return id;
}

@Override
public Long sequenceNum() {
return sequenceNum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ public CompletionStage<Long> nextSequence(InMemoryEventStore.Transaction<E, Meta
return CompletionStages.completedStage(sequenceNums.accumulateAndGet(value, Math::max));
}

@Override
public CompletionStage<List<Long>> nextSequences(InMemoryEventStore.Transaction<E, Meta, Context> tx, Integer count) {
return CompletionStages.traverse(List.range(0, count), c -> nextSequence(tx));
}

@Override
public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> events) {
events.forEach(e -> store.put(e.sequenceNum, e));
Expand Down
5 changes: 5 additions & 0 deletions thoth-core-reactor/src/test/java/fr/maif/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,11 @@ public Viking(String id, String name, Integer age, Long sequenceNum) {
this.sequenceNum = sequenceNum;
}

@Override
public String entityId() {
return id;
}

@Override
public Long sequenceNum() {
return sequenceNum;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package fr.maif.eventsourcing;

public class AbstractState<T> implements State<T> {
public abstract class AbstractState<T> implements State<T> {
protected long sequenceNum;

@Override
Expand Down
11 changes: 11 additions & 0 deletions thoth-core/src/main/java/fr/maif/eventsourcing/AggregateStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import fr.maif.concurrent.CompletionStages;
import io.vavr.Tuple;
import io.vavr.Tuple0;
import io.vavr.collection.HashMap;
import io.vavr.collection.List;
import io.vavr.collection.Map;
import io.vavr.control.Option;

import java.util.concurrent.CompletionStage;
Expand All @@ -12,6 +14,11 @@ public interface AggregateStore<S extends State<S>, Id, TxCtx> {

CompletionStage<Option<S>> getAggregate(Id entityId);

default CompletionStage<Map<Id, Option<S>>> getAggregates(TxCtx ctx, List<Id> entityIds) {
return CompletionStages.traverse(entityIds, id -> getAggregate(ctx, id).thenApply(agg -> Tuple.of(id, agg)))
.thenApply(HashMap::ofEntries);
}

CompletionStage<Option<S>> getAggregate(TxCtx ctx, Id entityId);

default CompletionStage<Tuple0> storeSnapshot(TxCtx transactionContext, Id id, Option<S> state) {
Expand All @@ -22,6 +29,10 @@ default CompletionStage<Option<S>> getSnapshot(TxCtx transactionContext, Id id)
return CompletionStages.completedStage(Option.none());
}

default CompletionStage<List<S>> getSnapshots(TxCtx transactionContext, List<Id> ids) {
return CompletionStages.completedStage(List.empty());
}

default <E extends Event> CompletionStage<Option<S>> buildAggregateAndStoreSnapshot(TxCtx ctx, EventHandler<S, E> eventHandler, Option<S> state, Id id, List<E> events, Option<Long> lastSequenceNum) {

Option<S> newState = eventHandler.deriveState(state, events.filter(event -> event.entityId().equals(id)));
Expand Down
Loading

0 comments on commit 3039c1a

Please sign in to comment.