From 2aa6f6be5f10deb8f8aa192046b7335c58c7b752 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Del=C3=A8gue?= Date: Mon, 15 Jul 2024 18:30:50 +0200 Subject: [PATCH] Optimisations --- thoth-jooq-reactor/build.gradle | 1 + .../DefaultReactorAggregateStoreTest.java | 119 ++++++++++++++++++ 2 files changed, 120 insertions(+) create mode 100644 thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/DefaultReactorAggregateStoreTest.java diff --git a/thoth-jooq-reactor/build.gradle b/thoth-jooq-reactor/build.gradle index 0e795614..464a1f27 100644 --- a/thoth-jooq-reactor/build.gradle +++ b/thoth-jooq-reactor/build.gradle @@ -24,6 +24,7 @@ dependencies { testImplementation "org.testcontainers:junit-jupiter:$testContainerVersion" testImplementation "org.testcontainers:testcontainers:$testContainerVersion" testImplementation "org.testcontainers:postgresql:$testContainerVersion" + testImplementation 'org.mockito:mockito-core:5.12.0' } test { diff --git a/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/DefaultReactorAggregateStoreTest.java b/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/DefaultReactorAggregateStoreTest.java new file mode 100644 index 00000000..a0073c91 --- /dev/null +++ b/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/DefaultReactorAggregateStoreTest.java @@ -0,0 +1,119 @@ +package fr.maif.eventsourcing; + +import io.vavr.Tuple; +import io.vavr.Tuple0; +import io.vavr.collection.HashMap; +import io.vavr.collection.List; +import io.vavr.collection.Map; +import io.vavr.control.Option; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; +import java.util.function.Function; + +import static fr.maif.eventsourcing.AbstractPostgresEventStoreTest.eventEnvelope; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class DefaultReactorAggregateStoreTest { + + private ReactorEventStore eventStore; + + record CountState(String id, Long sequenceNum, Integer count) implements State { + @Override + public String entityId() { + return id; + } + + @Override + public Long sequenceNum() { + return sequenceNum; + } + + @Override + public CountState withSequenceNum(Long sequenceNum) { + return new CountState(id, sequenceNum, count); + } + } + + sealed interface CountEvent extends Event { + record AddedEvent(String id, Integer add) implements CountEvent { + @Override + public Type type() { + return Type.create(AddedEvent.class, 1L); + } + + @Override + public String entityId() { + return id; + } + } + } + + EventHandler eventHandler = (EventHandler) (state, events) -> { + if( events instanceof CountEvent.AddedEvent addedEvent) { + return state.map(s -> new CountState(s.id, s.sequenceNum, s.count + addedEvent.add)).orElse( + Option.of(new CountState(addedEvent.id(), 0L, addedEvent.add))); + } else { + return state; + } + }; + + DefaultReactorAggregateStore defaultReactorAggregateStore; + + @BeforeEach + public void setUp() { + eventStore = mock(ReactorEventStore.class); + defaultReactorAggregateStore = new DefaultReactorAggregateStore<>( + eventStore, eventHandler, new ReactorTransactionManager() { + @Override + public Mono withTransaction(Function> callBack) { + return callBack.apply(Tuple.empty()); + } + }) { + @Override + public Mono> getSnapshots(Tuple0 transactionContext, List strings) { + return Mono.just(strings.flatMap(id -> { + if (id.equals("5")) { + return Option.of(new CountState(id, 0L, 50)); + } else { + return Option.none(); + } + })); + } + }; + } + + @Test + void loadEventToAggregates() { + List idsInt = List.range(1, 10); + List ids = idsInt.map(String::valueOf); + + when(eventStore.loadEventsByQuery(any(), any())).thenReturn(Flux.fromIterable(idsInt.flatMap(id -> + List.range(0, id).map(add -> + new CountEvent.AddedEvent(String.valueOf(id), add) + ) + )).map(evt -> eventEnvelope(0L, evt, LocalDateTime.now()))); + + Map> result = defaultReactorAggregateStore.getAggregates(Tuple.empty(), ids).block(); + assertThat(result).isEqualTo(HashMap.ofEntries(List.of( + new CountState("1", 0L, 0), + new CountState("2", 0L, 1), + new CountState("3", 0L, 3), + new CountState("4", 0L, 6), + new CountState("5", 0L, 60), + new CountState("6", 0L, 15), + new CountState("7", 0L, 21), + new CountState("8", 0L, 28), + new CountState("9", 0L, 36) + ).map(s -> Tuple.of(s.id, Option.of(s))))); + + } + + +} \ No newline at end of file