From ed043a5ad96b1a2fd97bcff2727a925856b2e35b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Del=C3=A8gue?= Date: Fri, 12 Jul 2024 11:43:03 +0200 Subject: [PATCH] Optimisations --- build.gradle | 1 + .../ReactivePostgresEventStore.java | 2 +- thoth-jooq-reactor/build.gradle | 3 + .../ReactivePostgresEventStore.java | 2 +- .../AbstractPostgresEventStoreTest.java | 76 ++++++++++++++++--- .../JdbcPostgresEventStoreTest.java | 6 +- .../ReactivePostgresEventStoreTest.java | 8 +- .../impl/PostgresEventStore.java | 2 +- 8 files changed, 81 insertions(+), 19 deletions(-) diff --git a/build.gradle b/build.gradle index 034539ab..e1beb569 100644 --- a/build.gradle +++ b/build.gradle @@ -39,6 +39,7 @@ subprojects { reactorKafkaVersion = "1.3.22" reactorVersion = "3.5.7" vertxSqlVersion = "4.3.3" + testContainerVersion = "1.19.8" } test { diff --git a/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java b/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java index e207c80e..dd18dd84 100644 --- a/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java +++ b/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java @@ -231,7 +231,7 @@ public Publisher> loadEventsByQuery(PgAsyncTrans query.sequenceTo().map(SEQUENCE_NUM::le), query.sequenceFrom().map(SEQUENCE_NUM::ge), Option.of(query.idsAndSequences()).filter(Traversable::nonEmpty).map(l -> - l.map(t -> SEQUENCE_NUM.ge(t._2).and(ENTITY_ID.eq(t._1))).reduce(Condition::or) + l.map(t -> SEQUENCE_NUM.gt(t._2).and(ENTITY_ID.eq(t._1))).reduce(Condition::or) ) ).flatMap(identity()); diff --git a/thoth-jooq-reactor/build.gradle b/thoth-jooq-reactor/build.gradle index e48cd4d9..0e795614 100644 --- a/thoth-jooq-reactor/build.gradle +++ b/thoth-jooq-reactor/build.gradle @@ -21,6 +21,9 @@ dependencies { testImplementation("org.mockito:mockito-all:1.10.19") testImplementation("org.junit.jupiter:junit-jupiter:5.9.3") testRuntimeOnly 'org.junit.platform:junit-platform-launcher' + testImplementation "org.testcontainers:junit-jupiter:$testContainerVersion" + testImplementation "org.testcontainers:testcontainers:$testContainerVersion" + testImplementation "org.testcontainers:postgresql:$testContainerVersion" } test { diff --git a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java index 6d30dd15..4594869f 100644 --- a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java +++ b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java @@ -263,7 +263,7 @@ public Publisher> loadEventsByQuery(Tx tx, Query query.sequenceTo().map(SEQUENCE_NUM::le), query.sequenceFrom().map(SEQUENCE_NUM::ge), Option.of(query.idsAndSequences()).filter(Traversable::nonEmpty).map(l -> - l.map(t -> SEQUENCE_NUM.ge(t._2).and(ENTITY_ID.eq(t._1))).reduce(Condition::or) + l.map(t -> SEQUENCE_NUM.gt(t._2).and(ENTITY_ID.eq(t._1))).reduce(Condition::or) ) ).flatMap(identity()); diff --git a/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/AbstractPostgresEventStoreTest.java b/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/AbstractPostgresEventStoreTest.java index bbd8fd40..a1b1de69 100644 --- a/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/AbstractPostgresEventStoreTest.java +++ b/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/AbstractPostgresEventStoreTest.java @@ -16,10 +16,13 @@ import org.jooq.SQLDialect; import org.jooq.Table; import org.jooq.impl.DSL; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.postgresql.ds.PGSimpleDataSource; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.utility.DockerImageName; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -28,7 +31,10 @@ import java.sql.Statement; import java.time.Duration; import java.time.LocalDateTime; -import java.util.*; +import java.util.Date; +import java.util.Objects; +import java.util.StringJoiner; +import java.util.UUID; import java.util.concurrent.CompletionStage; import java.util.function.Function; @@ -39,13 +45,53 @@ import static org.jooq.impl.DSL.table; import static org.mockito.Mockito.mock; + public abstract class AbstractPostgresEventStoreTest { - protected Integer port = 5557; - protected String host = "localhost"; - protected String database = "eventsourcing"; - protected String user = "eventsourcing"; - protected String password = "eventsourcing"; + private static final PostgreSQLContainer postgreSQLContainer = new PostgreSQLContainer<>(DockerImageName.parse("postgres:14")) + .withUsername("eventsourcing") + .withPassword("eventsourcing") + .withDatabaseName("eventsourcing"); + + static { + if(!isCi()) { + postgreSQLContainer.start(); + } + } + + @AfterAll + public static void stopDb() { + if(!isCi()) { + postgreSQLContainer.stop(); + } + } + + protected static boolean isCi() { + return "true".equals(System.getenv("CI")); + } + + protected static Integer port() { + if (isCi()) { + return 5557; + } else { + return postgreSQLContainer.getFirstMappedPort(); + } + } + + protected static String host() { + return "localhost"; + } + protected static String database() { + return "eventsourcing"; + } + protected static String user() { + return "eventsourcing"; + } + protected static String password() { + return "eventsourcing"; + } + + private ReactivePostgresEventStore postgresEventStore; private PgAsyncPool pgAsyncPool; @@ -169,6 +215,18 @@ public void queryingByDate() { assertThat(events).containsExactlyInAnyOrder(event1, event2, event3); } + @Test + public void queryingBySeqAndEntityId() { + initDatas(); + List> events = getFromQuery(EventStore.Query.builder() + .withIdsAndSequences(List.of( + Tuple("bjorn@gmail.com", 0L), + Tuple("ragnard@gmail.com", 5L) + )) + .build()); + assertThat(events).containsExactlyInAnyOrder(event1, event2, event3, event6); + } + @Test public void queryingByPublished() { @@ -299,9 +357,9 @@ public void setUp() { this.pgAsyncPool = init(); PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource(); - pgSimpleDataSource.setUrl("jdbc:postgresql://"+host+":"+port+"/"+database); - pgSimpleDataSource.setUser(user); - pgSimpleDataSource.setPassword(password); + pgSimpleDataSource.setUrl("jdbc:postgresql://"+host()+":"+port()+"/"+database()); + pgSimpleDataSource.setUser(user()); + pgSimpleDataSource.setPassword(password()); this.dslContext = DSL.using(pgSimpleDataSource, SQLDialect.POSTGRES); Try.of(() -> { this.dslContext.deleteFrom(vikings_journal).execute(); diff --git a/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/JdbcPostgresEventStoreTest.java b/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/JdbcPostgresEventStoreTest.java index c2e9a89b..a9d60735 100644 --- a/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/JdbcPostgresEventStoreTest.java +++ b/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/JdbcPostgresEventStoreTest.java @@ -16,9 +16,9 @@ protected PgAsyncPool init() { jooqConfig.setSQLDialect(SQLDialect.POSTGRES); PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource(); - pgSimpleDataSource.setUrl("jdbc:postgresql://"+host+":"+port+"/"+database); - pgSimpleDataSource.setUser(user); - pgSimpleDataSource.setPassword(password); + pgSimpleDataSource.setUrl("jdbc:postgresql://"+host()+":"+port()+"/"+database()); + pgSimpleDataSource.setUser(user()); + pgSimpleDataSource.setPassword(password()); return new JdbcPgAsyncPool(SQLDialect.POSTGRES, pgSimpleDataSource, Executors.newFixedThreadPool(3)); } diff --git a/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/ReactivePostgresEventStoreTest.java b/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/ReactivePostgresEventStoreTest.java index 6bb674ae..4096ea41 100644 --- a/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/ReactivePostgresEventStoreTest.java +++ b/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/ReactivePostgresEventStoreTest.java @@ -21,11 +21,11 @@ protected PgAsyncPool init() { PoolOptions poolOptions = new PoolOptions().setMaxSize(30); PgConnectOptions options = new PgConnectOptions() - .setPort(port) + .setPort(port()) .setHost("localhost") - .setDatabase(database) - .setUser(user) - .setPassword(password); + .setDatabase(database()) + .setUser(user()) + .setPassword(password()); PgPool client = PgPool.pool(Vertx.vertx(), options, poolOptions); return new ReactivePgAsyncPool(client, jooqConfig); } diff --git a/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java b/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java index 5835e4de..559bb50f 100644 --- a/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java +++ b/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java @@ -310,7 +310,7 @@ public Publisher> loadEventsByQueryWithOptions(C query.sequenceTo().map(d -> field(" sequence_num").lessOrEqual(d)), query.sequenceFrom().map(d -> field(" sequence_num").greaterOrEqual(d)), Option.of(query.idsAndSequences()).filter(Traversable::nonEmpty).map(l -> - l.map(t -> field(" sequence_num").greaterOrEqual(t._2).and(field(" entity_id").eq(t._1))).reduce(Condition::or) + l.map(t -> field(" sequence_num").greaterThan(t._2).and(field(" entity_id").eq(t._1))).reduce(Condition::or) ) ).flatMap(identity());