Skip to content

Commit

Permalink
Optimisations
Browse files Browse the repository at this point in the history
  • Loading branch information
larousso committed Jul 12, 2024
1 parent 2414065 commit ed043a5
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 19 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ subprojects {
reactorKafkaVersion = "1.3.22"
reactorVersion = "3.5.7"
vertxSqlVersion = "4.3.3"
testContainerVersion = "1.19.8"
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public Publisher<EventEnvelope<E, Meta, Context>> loadEventsByQuery(PgAsyncTrans
query.sequenceTo().map(SEQUENCE_NUM::le),
query.sequenceFrom().map(SEQUENCE_NUM::ge),
Option.of(query.idsAndSequences()).filter(Traversable::nonEmpty).map(l ->
l.map(t -> SEQUENCE_NUM.ge(t._2).and(ENTITY_ID.eq(t._1))).reduce(Condition::or)
l.map(t -> SEQUENCE_NUM.gt(t._2).and(ENTITY_ID.eq(t._1))).reduce(Condition::or)
)
).flatMap(identity());

Expand Down
3 changes: 3 additions & 0 deletions thoth-jooq-reactor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ dependencies {
testImplementation("org.mockito:mockito-all:1.10.19")
testImplementation("org.junit.jupiter:junit-jupiter:5.9.3")
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
testImplementation "org.testcontainers:junit-jupiter:$testContainerVersion"
testImplementation "org.testcontainers:testcontainers:$testContainerVersion"
testImplementation "org.testcontainers:postgresql:$testContainerVersion"
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public Publisher<EventEnvelope<E, Meta, Context>> loadEventsByQuery(Tx tx, Query
query.sequenceTo().map(SEQUENCE_NUM::le),
query.sequenceFrom().map(SEQUENCE_NUM::ge),
Option.of(query.idsAndSequences()).filter(Traversable::nonEmpty).map(l ->
l.map(t -> SEQUENCE_NUM.ge(t._2).and(ENTITY_ID.eq(t._1))).reduce(Condition::or)
l.map(t -> SEQUENCE_NUM.gt(t._2).and(ENTITY_ID.eq(t._1))).reduce(Condition::or)
)
).flatMap(identity());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
import org.jooq.SQLDialect;
import org.jooq.Table;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.postgresql.ds.PGSimpleDataSource;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.DockerImageName;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -28,7 +31,10 @@
import java.sql.Statement;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.*;
import java.util.Date;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

Expand All @@ -39,13 +45,53 @@
import static org.jooq.impl.DSL.table;
import static org.mockito.Mockito.mock;


public abstract class AbstractPostgresEventStoreTest {

protected Integer port = 5557;
protected String host = "localhost";
protected String database = "eventsourcing";
protected String user = "eventsourcing";
protected String password = "eventsourcing";
private static final PostgreSQLContainer<?> postgreSQLContainer = new PostgreSQLContainer<>(DockerImageName.parse("postgres:14"))
.withUsername("eventsourcing")
.withPassword("eventsourcing")
.withDatabaseName("eventsourcing");

static {
if(!isCi()) {
postgreSQLContainer.start();
}
}

@AfterAll
public static void stopDb() {
if(!isCi()) {
postgreSQLContainer.stop();
}
}

protected static boolean isCi() {
return "true".equals(System.getenv("CI"));
}

protected static Integer port() {
if (isCi()) {
return 5557;
} else {
return postgreSQLContainer.getFirstMappedPort();
}
}

protected static String host() {
return "localhost";
}
protected static String database() {
return "eventsourcing";
}
protected static String user() {
return "eventsourcing";
}
protected static String password() {
return "eventsourcing";
}



private ReactivePostgresEventStore<PgAsyncTransaction, VikingEvent, Void, Void> postgresEventStore;
private PgAsyncPool pgAsyncPool;
Expand Down Expand Up @@ -169,6 +215,18 @@ public void queryingByDate() {
assertThat(events).containsExactlyInAnyOrder(event1, event2, event3);
}

@Test
public void queryingBySeqAndEntityId() {
initDatas();
List<EventEnvelope<VikingEvent, Void, Void>> events = getFromQuery(EventStore.Query.builder()
.withIdsAndSequences(List.of(
Tuple("[email protected]", 0L),
Tuple("[email protected]", 5L)
))
.build());
assertThat(events).containsExactlyInAnyOrder(event1, event2, event3, event6);
}


@Test
public void queryingByPublished() {
Expand Down Expand Up @@ -299,9 +357,9 @@ public void setUp() {
this.pgAsyncPool = init();

PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource();
pgSimpleDataSource.setUrl("jdbc:postgresql://"+host+":"+port+"/"+database);
pgSimpleDataSource.setUser(user);
pgSimpleDataSource.setPassword(password);
pgSimpleDataSource.setUrl("jdbc:postgresql://"+host()+":"+port()+"/"+database());
pgSimpleDataSource.setUser(user());
pgSimpleDataSource.setPassword(password());
this.dslContext = DSL.using(pgSimpleDataSource, SQLDialect.POSTGRES);
Try.of(() -> {
this.dslContext.deleteFrom(vikings_journal).execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ protected PgAsyncPool init() {
jooqConfig.setSQLDialect(SQLDialect.POSTGRES);

PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource();
pgSimpleDataSource.setUrl("jdbc:postgresql://"+host+":"+port+"/"+database);
pgSimpleDataSource.setUser(user);
pgSimpleDataSource.setPassword(password);
pgSimpleDataSource.setUrl("jdbc:postgresql://"+host()+":"+port()+"/"+database());
pgSimpleDataSource.setUser(user());
pgSimpleDataSource.setPassword(password());

return new JdbcPgAsyncPool(SQLDialect.POSTGRES, pgSimpleDataSource, Executors.newFixedThreadPool(3));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ protected PgAsyncPool init() {

PoolOptions poolOptions = new PoolOptions().setMaxSize(30);
PgConnectOptions options = new PgConnectOptions()
.setPort(port)
.setPort(port())
.setHost("localhost")
.setDatabase(database)
.setUser(user)
.setPassword(password);
.setDatabase(database())
.setUser(user())
.setPassword(password());
PgPool client = PgPool.pool(Vertx.vertx(), options, poolOptions);
return new ReactivePgAsyncPool(client, jooqConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ public Publisher<EventEnvelope<E, Meta, Context>> loadEventsByQueryWithOptions(C
query.sequenceTo().map(d -> field(" sequence_num").lessOrEqual(d)),
query.sequenceFrom().map(d -> field(" sequence_num").greaterOrEqual(d)),
Option.of(query.idsAndSequences()).filter(Traversable::nonEmpty).map(l ->
l.map(t -> field(" sequence_num").greaterOrEqual(t._2).and(field(" entity_id").eq(t._1))).reduce(Condition::or)
l.map(t -> field(" sequence_num").greaterThan(t._2).and(field(" entity_id").eq(t._1))).reduce(Condition::or)
)
).flatMap(identity());

Expand Down

0 comments on commit ed043a5

Please sign in to comment.