diff --git a/src/main/java/org/neo4j/cdc/client/CDCClient.java b/src/main/java/org/neo4j/cdc/client/CDCClient.java index 90bac80..2d7e7a1 100644 --- a/src/main/java/org/neo4j/cdc/client/CDCClient.java +++ b/src/main/java/org/neo4j/cdc/client/CDCClient.java @@ -27,6 +27,7 @@ import org.neo4j.cdc.client.model.ChangeIdentifier; import org.neo4j.cdc.client.selector.Selector; import org.neo4j.driver.Driver; +import org.neo4j.driver.SessionConfig; import org.neo4j.driver.reactive.RxResult; import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.types.MapAccessor; @@ -46,6 +47,7 @@ public class CDCClient implements CDCService { private static final String CDC_QUERY_STATEMENT = "call cdc.query($from, $selectors)"; private final Driver driver; private final List selectors; + private final SessionConfigSupplier sessionConfigSupplier; private final Duration streamingPollInterval; public CDCClient(Driver driver, Selector... selectors) { @@ -54,6 +56,22 @@ public CDCClient(Driver driver, Selector... selectors) { public CDCClient(Driver driver, Duration streamingPollInterval, Selector... selectors) { this.driver = Objects.requireNonNull(driver); + this.sessionConfigSupplier = () -> SessionConfig.builder().build(); + this.streamingPollInterval = Objects.requireNonNull(streamingPollInterval); + this.selectors = selectors == null ? List.of() : Arrays.asList(selectors); + } + + public CDCClient(Driver driver, SessionConfigSupplier sessionConfigSupplier, Selector... selectors) { + this(driver, sessionConfigSupplier, Duration.ofSeconds(1), selectors); + } + + public CDCClient( + Driver driver, + SessionConfigSupplier sessionConfigSupplier, + Duration streamingPollInterval, + Selector... selectors) { + this.driver = Objects.requireNonNull(driver); + this.sessionConfigSupplier = sessionConfigSupplier; this.streamingPollInterval = Objects.requireNonNull(streamingPollInterval); this.selectors = selectors == null ? List.of() : Arrays.asList(selectors); } @@ -71,7 +89,7 @@ public Mono current() { @Override public Flux query(ChangeIdentifier from) { return Flux.usingWhen( - Mono.fromSupplier(driver::rxSession), + Mono.fromSupplier(() -> driver.rxSession(sessionConfigSupplier.sessionConfig())), (RxSession session) -> Flux.from(session.readTransaction(tx -> { var params = Map.of( "from", @@ -97,7 +115,7 @@ public Flux stream(ChangeIdentifier from) { var cursor = new AtomicReference<>(from); var query = Flux.usingWhen( - Mono.fromSupplier(driver::rxSession), + Mono.fromSupplier(() -> driver.rxSession(sessionConfigSupplier.sessionConfig())), (RxSession session) -> Flux.from(session.readTransaction(tx -> { var params = Map.of( "from", @@ -136,7 +154,7 @@ private ChangeEvent applyPropertyFilters(ChangeEvent original) { private Mono queryForChangeIdentifier(String query, String description) { return Mono.usingWhen( - Mono.fromSupplier(driver::rxSession), + Mono.fromSupplier(() -> driver.rxSession(sessionConfigSupplier.sessionConfig())), (RxSession session) -> Mono.from(session.readTransaction(tx -> { RxResult result = tx.run(query); return Mono.from(result.records()) diff --git a/src/main/java/org/neo4j/cdc/client/SessionConfigSupplier.java b/src/main/java/org/neo4j/cdc/client/SessionConfigSupplier.java new file mode 100644 index 0000000..6bd9131 --- /dev/null +++ b/src/main/java/org/neo4j/cdc/client/SessionConfigSupplier.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.cdc.client; + +import org.neo4j.driver.SessionConfig; + +/** + * The implementation can provide a session config that + * will be called each time before a session gets created. + * + * @author Gerrit Meier + */ +@FunctionalInterface +public interface SessionConfigSupplier { + + /** + * {@link SessionConfig} to be used with the current session. + * + * @return sessionConfig object. + */ + SessionConfig sessionConfig(); +} diff --git a/src/test/java/org/neo4j/cdc/client/CDCClientIT.java b/src/test/java/org/neo4j/cdc/client/CDCClientIT.java index 84200ca..8dd9da6 100644 --- a/src/test/java/org/neo4j/cdc/client/CDCClientIT.java +++ b/src/test/java/org/neo4j/cdc/client/CDCClientIT.java @@ -33,6 +33,7 @@ import org.neo4j.cdc.client.selector.RelationshipNodeSelector; import org.neo4j.cdc.client.selector.RelationshipSelector; import org.neo4j.driver.*; +import org.neo4j.driver.exceptions.FatalDiscoveryException; import org.testcontainers.containers.Neo4jContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -114,6 +115,16 @@ void changesCanBeQueried() { .verifyComplete(); } + @Test + void respectsSessionConfigSupplier() { + var client = new CDCClient( + driver, + () -> SessionConfig.builder().withDatabase("unknownDatabase").build()); + StepVerifier.create(client.current()) + .expectError(FatalDiscoveryException.class) + .verify(); + } + @Test void shouldReturnCypherTypesWithoutConversion() { var client = new CDCClient(driver, Duration.ZERO);