diff --git a/src/main/java/org/neo4j/cdc/client/model/ModelUtils.java b/src/main/java/org/neo4j/cdc/client/model/ModelUtils.java index 39c0d54..0d18ee0 100644 --- a/src/main/java/org/neo4j/cdc/client/model/ModelUtils.java +++ b/src/main/java/org/neo4j/cdc/client/model/ModelUtils.java @@ -20,6 +20,8 @@ import java.time.format.DateTimeFormatter; import java.time.temporal.TemporalAccessor; import java.util.*; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.commons.collections4.MapUtils; class ModelUtils { @@ -88,4 +90,79 @@ static ZonedDateTime getZonedDateTime(Map map, String key) { } return ZonedDateTime.parse(value.toString(), DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSSX")); } + + static Map>> getNodesKeys(Map cypherMap) { + var keysMap = coerseToMap(MapUtils.getObject(cypherMap, "keys")); + if (keysMap == null) { + return null; + } + if (keysMap.isEmpty()) { + return Collections.emptyMap(); + } + var valueType = keysMap.values().iterator().next().getClass(); + + // Check if the key structure is pre Neo4j 5.15 + if (Map.class.isAssignableFrom(valueType)) { + var preNode515keyMap = checkedMap(keysMap, String.class, Map.class); + return ModelUtils.transformMapValues( + preNode515keyMap, e -> List.of(ModelUtils.checkedMap(e, String.class, Object.class))); + } else { + var postNode515KeyMap = checkedMap(keysMap, String.class, List.class); + return ModelUtils.transformMapValues( + postNode515KeyMap, e -> coerceToListOfMaps(e, String.class, Object.class)); + } + } + + static Map coerseToMap(Object input) { + if (input == null) { + return null; + } + if (!(input instanceof Map)) { + throw new IllegalArgumentException(String.format( + "Unexpected type %s, expected Map", input.getClass().getSimpleName())); + } + return (Map) input; + } + + static List> coerceToListOfMaps(List input, Class keyType, Class valueType) { + if (input == null) { + return null; + } + return input.stream() + .map(e -> { + if (e != null && !(e instanceof Map)) { + throw new IllegalArgumentException( + "There are elements of unsupported types in the provided list, expected Map"); + } + try { + return checkedMap((Map) e, keyType, valueType); + } catch (RuntimeException ex) { + throw new IllegalArgumentException( + "There are elements of unsupported types in the provided list", ex); + } + }) + .collect(Collectors.toList()); + } + + static Map transformMapValues(Map input, Function transform) { + if (input == null) { + return null; + } + return input.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> transform.apply(e.getValue()))); + } + + static List> getRelationshipKeys(Map cypherMap) { + var keysList = ModelUtils.getList(cypherMap, "keys", Map.class); + if (keysList != null) { + return ModelUtils.coerceToListOfMaps(keysList, String.class, Object.class); + } + + // Check if the key structure is pre Neo4j 5.15 + var keyMap = ModelUtils.getMap(cypherMap, "key", String.class, Object.class); + if (keyMap == null) { + return null; + } + return List.of(keyMap); + } } diff --git a/src/main/java/org/neo4j/cdc/client/model/Node.java b/src/main/java/org/neo4j/cdc/client/model/Node.java index 83295c1..0f96fee 100644 --- a/src/main/java/org/neo4j/cdc/client/model/Node.java +++ b/src/main/java/org/neo4j/cdc/client/model/Node.java @@ -19,16 +19,15 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.stream.Collectors; import org.apache.commons.collections4.MapUtils; public class Node { private final String elementId; - private final Map> keys; + private final Map>> keys; private final List labels; - public Node(String elementId, List labels, Map> keys) { + public Node(String elementId, List labels, Map>> keys) { this.elementId = Objects.requireNonNull(elementId); this.labels = labels; this.keys = keys; @@ -38,7 +37,7 @@ public String getElementId() { return this.elementId; } - public Map> getKeys() { + public Map>> getKeys() { return this.keys; } @@ -76,13 +75,7 @@ public static Node fromMap(Map map) { var elementId = MapUtils.getString(cypherMap, "elementId"); var labels = ModelUtils.getList(cypherMap, "labels", String.class); - var keysMap = ModelUtils.getMap(cypherMap, "keys", String.class, Map.class); - var keys = keysMap != null - ? keysMap.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - e -> ModelUtils.checkedMap(e.getValue(), String.class, Object.class))) - : null; + var keys = ModelUtils.getNodesKeys(cypherMap); return new Node(elementId, labels, keys); } diff --git a/src/main/java/org/neo4j/cdc/client/model/NodeEvent.java b/src/main/java/org/neo4j/cdc/client/model/NodeEvent.java index 421465d..1635a9d 100644 --- a/src/main/java/org/neo4j/cdc/client/model/NodeEvent.java +++ b/src/main/java/org/neo4j/cdc/client/model/NodeEvent.java @@ -19,19 +19,18 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.stream.Collectors; import org.apache.commons.collections4.MapUtils; public class NodeEvent extends EntityEvent { - private final Map> keys; + private final Map>> keys; private final List labels; public NodeEvent( String elementId, EntityOperation operation, List labels, - Map> keys, + Map>> keys, NodeState before, NodeState after) { super(elementId, EventType.NODE, operation, before, after); @@ -44,7 +43,7 @@ public List getLabels() { return this.labels; } - public Map> getKeys() { + public Map>> getKeys() { return this.keys; } @@ -81,13 +80,7 @@ public static NodeEvent fromMap(Map map) { var elementId = MapUtils.getString(cypherMap, "elementId"); var operation = EntityOperation.fromShorthand(MapUtils.getString(cypherMap, "operation")); var labels = ModelUtils.getList(cypherMap, "labels", String.class); - var keysMap = ModelUtils.getMap(cypherMap, "keys", String.class, Map.class); - var keys = keysMap != null - ? keysMap.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - e -> ModelUtils.checkedMap(e.getValue(), String.class, Object.class))) - : null; + var keys = ModelUtils.getNodesKeys(cypherMap); var state = ModelUtils.checkedMap( Objects.requireNonNull(MapUtils.getMap(cypherMap, "state")), String.class, Object.class); diff --git a/src/main/java/org/neo4j/cdc/client/model/RelationshipEvent.java b/src/main/java/org/neo4j/cdc/client/model/RelationshipEvent.java index 4bc3ded..07ecdb8 100644 --- a/src/main/java/org/neo4j/cdc/client/model/RelationshipEvent.java +++ b/src/main/java/org/neo4j/cdc/client/model/RelationshipEvent.java @@ -16,6 +16,7 @@ */ package org.neo4j.cdc.client.model; +import java.util.List; import java.util.Map; import java.util.Objects; import org.apache.commons.collections4.MapUtils; @@ -25,14 +26,14 @@ public class RelationshipEvent extends EntityEvent { private final Node start; private final Node end; private final String type; - private final Map key; + private final List> keys; public RelationshipEvent( String elementId, String type, Node start, Node end, - Map key, + List> keys, EntityOperation operation, RelationshipState before, RelationshipState after) { @@ -41,7 +42,7 @@ public RelationshipEvent( this.start = Objects.requireNonNull(start); this.end = Objects.requireNonNull(end); this.type = Objects.requireNonNull(type); - this.key = key; + this.keys = keys; } public Node getStart() { @@ -56,8 +57,8 @@ public String getType() { return this.type; } - public Map getKey() { - return this.key; + public List> getKeys() { + return this.keys; } @Override @@ -71,7 +72,7 @@ public boolean equals(Object o) { if (!start.equals(that.start)) return false; if (!end.equals(that.end)) return false; if (!type.equals(that.type)) return false; - return Objects.equals(key, that.key); + return Objects.equals(keys, that.keys); } @Override @@ -80,15 +81,15 @@ public int hashCode() { result = 31 * result + start.hashCode(); result = 31 * result + end.hashCode(); result = 31 * result + type.hashCode(); - result = 31 * result + (key != null ? key.hashCode() : 0); + result = 31 * result + (keys != null ? keys.hashCode() : 0); return result; } @Override public String toString() { return String.format( - "RelationshipEvent{elementId=%s, start=%s, end=%s, type='%s', key=%s, operation=%s, before=%s, after=%s}", - getElementId(), start, end, type, key, getOperation(), getBefore(), getAfter()); + "RelationshipEvent{elementId=%s, start=%s, end=%s, type='%s', keys=%s, operation=%s, before=%s, after=%s}", + getElementId(), start, end, type, keys, getOperation(), getBefore(), getAfter()); } public static RelationshipEvent fromMap(Map map) { @@ -99,7 +100,7 @@ public static RelationshipEvent fromMap(Map map) { var type = MapUtils.getString(cypherMap, "type"); var start = Node.fromMap(ModelUtils.getMap(cypherMap, "start", String.class, Object.class)); var end = Node.fromMap(ModelUtils.getMap(cypherMap, "end", String.class, Object.class)); - var key = ModelUtils.getMap(cypherMap, "key", String.class, Object.class); + var key = ModelUtils.getRelationshipKeys(cypherMap); var state = ModelUtils.checkedMap( Objects.requireNonNull(MapUtils.getMap(cypherMap, "state")), String.class, Object.class); diff --git a/src/main/java/org/neo4j/cdc/client/selector/EntitySelector.java b/src/main/java/org/neo4j/cdc/client/selector/EntitySelector.java index 5b3a945..8d4c18c 100644 --- a/src/main/java/org/neo4j/cdc/client/selector/EntitySelector.java +++ b/src/main/java/org/neo4j/cdc/client/selector/EntitySelector.java @@ -229,7 +229,7 @@ public ChangeEvent applyProperties(ChangeEvent e) { relationshipEvent.getType(), relationshipEvent.getStart(), relationshipEvent.getEnd(), - relationshipEvent.getKey(), + relationshipEvent.getKeys(), relationshipEvent.getOperation(), beforeState, afterState)); diff --git a/src/main/java/org/neo4j/cdc/client/selector/NodeSelector.java b/src/main/java/org/neo4j/cdc/client/selector/NodeSelector.java index 8754406..29bf414 100644 --- a/src/main/java/org/neo4j/cdc/client/selector/NodeSelector.java +++ b/src/main/java/org/neo4j/cdc/client/selector/NodeSelector.java @@ -105,7 +105,8 @@ public boolean matches(ChangeEvent e) { return false; } - if (!key.isEmpty() && !nodeEvent.getKeys().containsValue(key)) { + if (!key.isEmpty() + && nodeEvent.getKeys().values().stream().flatMap(List::stream).noneMatch(key::equals)) { return false; } diff --git a/src/main/java/org/neo4j/cdc/client/selector/RelationshipSelector.java b/src/main/java/org/neo4j/cdc/client/selector/RelationshipSelector.java index 0d1199a..0187857 100644 --- a/src/main/java/org/neo4j/cdc/client/selector/RelationshipSelector.java +++ b/src/main/java/org/neo4j/cdc/client/selector/RelationshipSelector.java @@ -130,18 +130,22 @@ public boolean matches(ChangeEvent e) { if (start.getLabels().stream() .anyMatch(l -> !relationshipEvent.getStart().getLabels().contains(l)) || (!start.getKey().isEmpty() - && !relationshipEvent.getStart().getKeys().containsValue(start.getKey()))) { + && relationshipEvent.getStart().getKeys().values().stream() + .flatMap(List::stream) + .noneMatch(start.getKey()::equals))) { return false; } if (end.getLabels().stream() .anyMatch(l -> !relationshipEvent.getEnd().getLabels().contains(l)) || (!end.getKey().isEmpty() - && !relationshipEvent.getEnd().getKeys().containsValue(end.getKey()))) { + && relationshipEvent.getEnd().getKeys().values().stream() + .flatMap(List::stream) + .noneMatch(end.getKey()::equals))) { return false; } - if (!key.isEmpty() && !Objects.equals(key, relationshipEvent.getKey())) { + if (!key.isEmpty() && !relationshipEvent.getKeys().contains(key)) { return false; } diff --git a/src/test/java/org/neo4j/cdc/client/CDCClient514IT.java b/src/test/java/org/neo4j/cdc/client/CDCClient514IT.java new file mode 100644 index 0000000..e5729df --- /dev/null +++ b/src/test/java/org/neo4j/cdc/client/CDCClient514IT.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.cdc.client; + +import java.util.Collections; +import java.util.Map; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.neo4j.driver.AuthTokens; +import org.neo4j.driver.Driver; +import org.neo4j.driver.GraphDatabase; +import org.testcontainers.containers.Neo4jContainer; +import org.testcontainers.junit.jupiter.Container; + +/** + * Neo4j 5.15+ introduced a breaking change in node ande relationship keys structure. This suite verifies if + * CDC Client is backward compatible with 5.14 and earlier. + */ +public class CDCClient514IT extends CDCClientIT { + + private static final String NEO4J_VERSION = "5.14"; + + @SuppressWarnings("resource") + @Container + private static final Neo4jContainer neo4j = new Neo4jContainer<>("neo4j:" + NEO4J_VERSION + "-enterprise") + .withEnv("NEO4J_ACCEPT_LICENSE_AGREEMENT", "yes") + .withAdminPassword("passw0rd"); + + private static Driver driver; + + @BeforeAll + static void setup() { + driver = GraphDatabase.driver(neo4j.getBoltUrl(), AuthTokens.basic("neo4j", "passw0rd")); + } + + @AfterAll + static void cleanup() { + driver.close(); + } + + @Override + Driver driver() { + return driver; + } + + @Override + Neo4jContainer neo4j() { + return neo4j; + } + + @Override + Map defaultExpectedAdditionalEntries() { + return Collections.emptyMap(); + } +} diff --git a/src/test/java/org/neo4j/cdc/client/CDCClientIT.java b/src/test/java/org/neo4j/cdc/client/CDCClientIT.java index 8e4e097..de27eb1 100644 --- a/src/test/java/org/neo4j/cdc/client/CDCClientIT.java +++ b/src/test/java/org/neo4j/cdc/client/CDCClientIT.java @@ -23,8 +23,6 @@ import java.time.*; import java.util.*; import org.assertj.core.api.InstanceOfAssertFactories; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.neo4j.cdc.client.model.*; @@ -35,7 +33,6 @@ import org.neo4j.driver.*; import org.neo4j.driver.exceptions.FatalDiscoveryException; import org.testcontainers.containers.Neo4jContainer; -import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import reactor.test.StepVerifier; @@ -43,32 +40,19 @@ * @author Gerrit Meier */ @Testcontainers -public class CDCClientIT { +public abstract class CDCClientIT { - private static final String NEO4J_VERSION = "5"; + abstract Driver driver(); - @SuppressWarnings("resource") - @Container - private static final Neo4jContainer neo4j = new Neo4jContainer<>("neo4j:" + NEO4J_VERSION + "-enterprise") - .withEnv("NEO4J_ACCEPT_LICENSE_AGREEMENT", "yes") - .withAdminPassword("passw0rd"); + abstract Neo4jContainer neo4j(); - private static Driver driver; - private static ChangeIdentifier current; + abstract Map defaultExpectedAdditionalEntries(); - @BeforeAll - static void setup() { - driver = GraphDatabase.driver(neo4j.getBoltUrl(), AuthTokens.basic("neo4j", "passw0rd")); - } - - @AfterAll - static void cleanup() { - driver.close(); - } + private ChangeIdentifier current; @BeforeEach void reset() { - try (var session = driver.session()) { + try (var session = driver().session()) { session.run( "CREATE OR REPLACE DATABASE $db OPTIONS {txLogEnrichment: $mode} WAIT", Map.of("db", "neo4j", "mode", "FULL")) @@ -85,7 +69,7 @@ private static ChangeIdentifier currentChangeId(Session session) { @Test void earliest() { - var client = new CDCClient(driver, Duration.ZERO); + var client = new CDCClient(driver(), Duration.ZERO); StepVerifier.create(client.earliest()) .assertNext(cv -> assertNotNull(cv.getId())) @@ -94,7 +78,7 @@ void earliest() { @Test void current() { - var client = new CDCClient(driver, Duration.ZERO); + var client = new CDCClient(driver(), Duration.ZERO); StepVerifier.create(client.current()) .assertNext(cv -> assertNotNull(cv.getId())) @@ -103,9 +87,9 @@ void current() { @Test void changesCanBeQueried() { - var client = new CDCClient(driver, Duration.ZERO); + var client = new CDCClient(driver(), Duration.ZERO); - try (Session session = driver.session()) { + try (Session session = driver().session()) { session.run("CREATE ()").consume(); } @@ -117,7 +101,7 @@ void changesCanBeQueried() { @Test void respectsSessionConfigSupplier() { var client = new CDCClient( - driver, + driver(), () -> SessionConfig.builder().withDatabase("unknownDatabase").build()); StepVerifier.create(client.current()) .expectError(FatalDiscoveryException.class) @@ -126,7 +110,7 @@ void respectsSessionConfigSupplier() { @Test void shouldReturnCypherTypesWithoutConversion() { - var client = new CDCClient(driver, Duration.ZERO); + var client = new CDCClient(driver(), Duration.ZERO); var props = new HashMap(); props.put("bool", true); @@ -143,7 +127,7 @@ void shouldReturnCypherTypesWithoutConversion() { props.put("zoned_datetime", ZonedDateTime.of(1990, 5, 1, 23, 59, 59, 0, ZoneId.of("UTC"))); props.put("zoned_time", OffsetTime.of(23, 59, 59, 0, ZoneOffset.ofHours(1))); - try (Session session = driver.session()) { + try (Session session = driver().session()) { session.run("CREATE (a) SET a = $props", Map.of("props", props)).consume(); } @@ -162,9 +146,9 @@ void shouldReturnCypherTypesWithoutConversion() { @Test void nodeChangesCanBeQueried() { - CDCClient client = new CDCClient(driver, Duration.ZERO); + CDCClient client = new CDCClient(driver(), Duration.ZERO); - try (var session = driver.session()) { + try (var session = driver().session()) { session.run("CREATE CONSTRAINT FOR (p:Person) REQUIRE (p.first_name, p.last_name) IS NODE KEY") .consume(); @@ -188,8 +172,8 @@ void nodeChangesCanBeQueried() { assertThat(c.getSeq()).isNotNull(); }) .satisfies(e -> assertThat(e.getMetadata()) - .satisfies(m -> - assertThat(m.getAdditionalEntries()).isEmpty()) + .satisfies(m -> assertThat(m.getAdditionalEntries()) + .isEqualTo(defaultExpectedAdditionalEntries())) .satisfies(m -> assertThat(m.getAuthenticatedUser()).isEqualTo("neo4j")) .satisfies(m -> @@ -215,7 +199,8 @@ void nodeChangesCanBeQueried() { .hasFieldOrPropertyWithValue("elementId", elementId) .hasFieldOrPropertyWithValue("labels", List.of("Person", "Employee")) .hasFieldOrPropertyWithValue( - "keys", Map.of("Person", Map.of("first_name", "john", "last_name", "doe"))) + "keys", + Map.of("Person", List.of(Map.of("first_name", "john", "last_name", "doe")))) .hasFieldOrPropertyWithValue("before", null) .hasFieldOrPropertyWithValue( "after", @@ -234,9 +219,9 @@ void nodeChangesCanBeQueried() { @Test void relationshipChangesCanBeQueried() { - var client = new CDCClient(driver, Duration.ZERO); + var client = new CDCClient(driver(), Duration.ZERO); - try (var session = driver.session()) { + try (var session = driver().session()) { session.run("CREATE CONSTRAINT FOR (p:Person) REQUIRE (p.id) IS NODE KEY") .consume(); session.run("CREATE CONSTRAINT FOR (p:Place) REQUIRE (p.id) IS NODE KEY") @@ -268,8 +253,8 @@ void relationshipChangesCanBeQueried() { assertThat(c.getSeq()).isNotNull(); }) .satisfies(e -> assertThat(e.getMetadata()) - .satisfies(m -> - assertThat(m.getAdditionalEntries()).isEmpty()) + .satisfies(m -> assertThat(m.getAdditionalEntries()) + .isEqualTo(defaultExpectedAdditionalEntries())) .satisfies(m -> assertThat(m.getAuthenticatedUser()).isEqualTo("neo4j")) .satisfies(m -> @@ -299,12 +284,15 @@ void relationshipChangesCanBeQueried() { new Node( startElementId, List.of("Person"), - Map.of("Person", Map.of("id", 1L)))) + Map.of("Person", List.of(Map.of("id", 1L))))) .hasFieldOrPropertyWithValue( "end", new Node( - endElementId, List.of("Place"), Map.of("Place", Map.of("id", 48L)))) - .hasFieldOrPropertyWithValue("key", Map.of("on", LocalDate.of(1990, 5, 1))) + endElementId, + List.of("Place"), + Map.of("Place", List.of(Map.of("id", 48L))))) + .hasFieldOrPropertyWithValue( + "keys", List.of(Map.of("on", LocalDate.of(1990, 5, 1)))) .hasFieldOrPropertyWithValue("before", null) .hasFieldOrPropertyWithValue( "after", new RelationshipState(Map.of("on", LocalDate.of(1990, 5, 1)))))) @@ -315,7 +303,7 @@ void relationshipChangesCanBeQueried() { @Test void selectorsArePassedToServer() { - try (var session = driver.session()) { + try (var session = driver().session()) { session.run("CREATE CONSTRAINT FOR (p:Person) REQUIRE (p.id) IS NODE KEY") .consume(); session.run("CREATE CONSTRAINT FOR (p:Place) REQUIRE (p.id) IS NODE KEY") @@ -347,7 +335,7 @@ void selectorsArePassedToServer() { .get(0) .asString(); - StepVerifier.create(new CDCClient(driver, Duration.ZERO).query(current)) + StepVerifier.create(new CDCClient(driver(), Duration.ZERO).query(current)) .assertNext(n -> assertThat(n).extracting("event.elementId").isEqualTo(person1)) .assertNext(n -> assertThat(n).extracting("event.elementId").isEqualTo(person2)) .assertNext(n -> assertThat(n).extracting("event.elementId").isEqualTo(place)) @@ -355,7 +343,7 @@ void selectorsArePassedToServer() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver, + driver(), Duration.ZERO, new NodeSelector(EntityOperation.CREATE, emptySet(), Set.of("Place"))) .query(current)) @@ -363,7 +351,7 @@ void selectorsArePassedToServer() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver, + driver(), Duration.ZERO, new NodeSelector(EntityOperation.CREATE, emptySet(), Set.of("Place")), new NodeSelector(null, emptySet(), Set.of("Person"))) @@ -373,14 +361,14 @@ void selectorsArePassedToServer() { .assertNext(n -> assertThat(n).extracting("event.elementId").isEqualTo(place)) .verifyComplete(); - StepVerifier.create( - new CDCClient(driver, Duration.ZERO, new RelationshipSelector(null, emptySet(), "BORN_IN")) - .query(current)) + StepVerifier.create(new CDCClient( + driver(), Duration.ZERO, new RelationshipSelector(null, emptySet(), "BORN_IN")) + .query(current)) .assertNext(n -> assertThat(n).extracting("event.elementId").isEqualTo(bornIn)) .verifyComplete(); StepVerifier.create(new CDCClient( - driver, + driver(), Duration.ZERO, new NodeSelector(null, emptySet(), Set.of("Place")), new RelationshipSelector(null, emptySet(), "BORN_IN")) @@ -394,7 +382,7 @@ void selectorsArePassedToServer() { @Test void selectorsDoFilteringCorrectly() { - try (var session = driver.session()) { + try (var session = driver().session()) { session.run("CREATE CONSTRAINT FOR (p:Person) REQUIRE (p.id) IS NODE KEY") .consume(); session.run("CREATE CONSTRAINT FOR (p:Place) REQUIRE (p.id) IS NODE KEY") @@ -464,7 +452,7 @@ void selectorsDoFilteringCorrectly() { Map.of("bornIn", bornIn, "hospital", "state hospital", "doctor", "doctor who")) .consume(); - StepVerifier.create(new CDCClient(driver, Duration.ZERO).query(current)) + StepVerifier.create(new CDCClient(driver(), Duration.ZERO).query(current)) .assertNext(n -> assertThat(n).extracting("event.elementId").isEqualTo(person1)) .assertNext(n -> assertThat(n).extracting("event.elementId").isEqualTo(person2)) .assertNext(n -> assertThat(n).extracting("event.elementId").isEqualTo(place)) @@ -473,7 +461,7 @@ void selectorsDoFilteringCorrectly() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver, + driver(), Duration.ZERO, new NodeSelector( EntityOperation.CREATE, @@ -494,7 +482,7 @@ void selectorsDoFilteringCorrectly() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver, + driver(), Duration.ZERO, new NodeSelector( EntityOperation.CREATE, @@ -515,7 +503,7 @@ void selectorsDoFilteringCorrectly() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver, + driver(), Duration.ZERO, new NodeSelector( EntityOperation.CREATE, @@ -536,7 +524,7 @@ void selectorsDoFilteringCorrectly() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver, + driver(), Duration.ZERO, new NodeSelector( null, @@ -580,7 +568,7 @@ void selectorsDoFilteringCorrectly() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver, + driver(), Duration.ZERO, new NodeSelector( null, @@ -616,7 +604,7 @@ void selectorsDoFilteringCorrectly() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver, + driver(), Duration.ZERO, new RelationshipSelector( null, @@ -653,7 +641,7 @@ void selectorsDoFilteringCorrectly() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver, + driver(), Duration.ZERO, new RelationshipSelector( null, @@ -691,7 +679,7 @@ void selectorsDoFilteringCorrectly() { // first matching selector wins StepVerifier.create(new CDCClient( - driver, + driver(), Duration.ZERO, new RelationshipSelector( EntityOperation.CREATE, @@ -729,7 +717,7 @@ void selectorsDoFilteringCorrectly() { @Test void userSelectorsFilterCorrectly() { // prepare - try (var session = driver.session()) { + try (var session = driver().session()) { session.run( "CREATE OR REPLACE USER $user SET PLAINTEXT PASSWORD $pwd CHANGE NOT REQUIRED", Map.of("user", "test", "pwd", "passw0rd")) @@ -742,7 +730,7 @@ void userSelectorsFilterCorrectly() { } // make changes with test user - try (var driver = GraphDatabase.driver(neo4j.getBoltUrl(), AuthTokens.basic("test", "passw0rd")); + try (var driver = GraphDatabase.driver(neo4j().getBoltUrl(), AuthTokens.basic("test", "passw0rd")); var session = driver.session(); var impersonatedSession = driver.session( SessionConfig.builder().withImpersonatedUser("neo4j").build())) { @@ -755,13 +743,13 @@ void userSelectorsFilterCorrectly() { } // make changes with neo4j user - try (var session = driver.session()) { + try (var session = driver().session()) { session.run("UNWIND range(1, 100) AS n CREATE (:Neo4j {id: n})").consume(); } // verify authenticatedUser = test StepVerifier.create(new CDCClient( - driver, + driver(), new EntitySelector( null, emptySet(), @@ -778,7 +766,7 @@ void userSelectorsFilterCorrectly() { // verify authenticatedUser = neo4j StepVerifier.create(new CDCClient( - driver, + driver(), new EntitySelector( null, emptySet(), @@ -795,7 +783,7 @@ void userSelectorsFilterCorrectly() { // verify executingUser = neo4j StepVerifier.create(new CDCClient( - driver, + driver(), new EntitySelector( null, emptySet(), Map.of(EntitySelector.METADATA_KEY_EXECUTING_USER, "neo4j"))) .query(current)) @@ -810,7 +798,7 @@ null, emptySet(), Map.of(EntitySelector.METADATA_KEY_EXECUTING_USER, "neo4j"))) // verify executingUser = test StepVerifier.create(new CDCClient( - driver, + driver(), new EntitySelector( null, emptySet(), Map.of(EntitySelector.METADATA_KEY_EXECUTING_USER, "test"))) .query(current)) @@ -825,7 +813,7 @@ null, emptySet(), Map.of(EntitySelector.METADATA_KEY_EXECUTING_USER, "test"))) // verify authenticatedUser = test, executingUser = neo4j StepVerifier.create(new CDCClient( - driver, + driver(), new EntitySelector( null, emptySet(), @@ -849,7 +837,7 @@ null, emptySet(), Map.of(EntitySelector.METADATA_KEY_EXECUTING_USER, "test"))) @Test void txMetadataSelectorFiltersCorrectly() { - try (var session = driver.session()) { + try (var session = driver().session()) { session.run( "UNWIND range(1, 100) AS n CREATE (:Test {id: n})", TransactionConfig.builder() @@ -858,7 +846,7 @@ void txMetadataSelectorFiltersCorrectly() { .consume(); } - try (var session = driver.session()) { + try (var session = driver().session()) { session.run( "UNWIND range(1, 100) AS n CREATE (:Other {id: n})", TransactionConfig.builder() @@ -867,7 +855,7 @@ void txMetadataSelectorFiltersCorrectly() { .consume(); } - try (var session = driver.session()) { + try (var session = driver().session()) { session.run( "UNWIND range(1, 100) AS n CREATE (:Another {id: n})", TransactionConfig.builder() @@ -877,7 +865,7 @@ void txMetadataSelectorFiltersCorrectly() { } StepVerifier.create(new CDCClient( - driver, + driver(), new EntitySelector( null, emptySet(), @@ -893,7 +881,7 @@ void txMetadataSelectorFiltersCorrectly() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver, + driver(), new EntitySelector( null, emptySet(), @@ -910,7 +898,7 @@ void txMetadataSelectorFiltersCorrectly() { .verifyComplete(); StepVerifier.create(new CDCClient( - driver, + driver(), new EntitySelector( null, emptySet(), diff --git a/src/test/java/org/neo4j/cdc/client/CDCClientLatestIT.java b/src/test/java/org/neo4j/cdc/client/CDCClientLatestIT.java new file mode 100644 index 0000000..9a8c617 --- /dev/null +++ b/src/test/java/org/neo4j/cdc/client/CDCClientLatestIT.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.cdc.client; + +import java.util.Map; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.neo4j.driver.AuthTokens; +import org.neo4j.driver.Driver; +import org.neo4j.driver.GraphDatabase; +import org.testcontainers.containers.Neo4jContainer; +import org.testcontainers.junit.jupiter.Container; + +/** + * TODO At the moment of writing Neo4j 5.15 hasn't publicly releases yet. This means that these tests are identical to + * {@link CDCClient514IT Integration tests for Neo4j 5.14}. When 5.15 is released, we need to enabled these tests. + */ +@Disabled +public class CDCClientLatestIT extends CDCClientIT { + + private static final String NEO4J_VERSION = "5"; + + @SuppressWarnings("resource") + @Container + private static final Neo4jContainer neo4j = new Neo4jContainer<>("neo4j:" + NEO4J_VERSION + "-enterprise") + .withEnv("NEO4J_ACCEPT_LICENSE_AGREEMENT", "yes") + .withAdminPassword("passw0rd"); + + private static Driver driver; + + @BeforeAll + static void setup() { + driver = GraphDatabase.driver(neo4j.getBoltUrl(), AuthTokens.basic("neo4j", "passw0rd")); + } + + @AfterAll + static void cleanup() { + driver.close(); + } + + @Override + Driver driver() { + return driver; + } + + @Override + Neo4jContainer neo4j() { + return neo4j; + } + + @Override + Map defaultExpectedAdditionalEntries() { + return Map.of("databaseName", "neo4j"); + } +} diff --git a/src/test/java/org/neo4j/cdc/client/selector/SelectorTest.java b/src/test/java/org/neo4j/cdc/client/selector/SelectorTest.java index c609929..d6d54b0 100644 --- a/src/test/java/org/neo4j/cdc/client/selector/SelectorTest.java +++ b/src/test/java/org/neo4j/cdc/client/selector/SelectorTest.java @@ -170,29 +170,29 @@ void nodeSelectorMatchesNodeEvents() { assertThat(new NodeSelector(null, emptySet(), emptySet(), Map.of("id", 1L, "dob", "1990"), emptyMap()) .matches(event)) .isFalse(); - assertThat(new NodeSelector(null, emptySet(), emptySet(), Map.of("id", 5L, "role", "manager"), emptyMap()) + assertThat(new NodeSelector(null, emptySet(), emptySet(), Map.of("id", 1L, "role", "manager"), emptyMap()) .matches(event)) .isTrue(); assertThat(new NodeSelector( - null, emptySet(), Set.of("Employee"), Map.of("id", 5L, "role", "manager"), emptyMap()) + null, emptySet(), Set.of("Employee"), Map.of("id", 1L, "role", "manager"), emptyMap()) .matches(event)) .isTrue(); assertThat(new NodeSelector( null, emptySet(), Set.of("Employee", "Person"), - Map.of("id", 5L, "role", "manager"), + Map.of("id", 1L, "role", "manager"), emptyMap()) .matches(event)) .isTrue(); - assertThat(new NodeSelector(null, emptySet(), Set.of("Person"), Map.of("id", 5L, "role", "manager"), emptyMap()) + assertThat(new NodeSelector(null, emptySet(), Set.of("Person"), Map.of("id", 1L, "role", "manager"), emptyMap()) .matches(event)) .isTrue(); assertThat(new NodeSelector( null, emptySet(), Set.of("Person", "Manager"), - Map.of("id", 5L, "role", "manager"), + Map.of("id", 1L, "role", "manager"), emptyMap()) .matches(event)) .isFalse(); @@ -200,7 +200,7 @@ null, emptySet(), Set.of("Employee"), Map.of("id", 5L, "role", "manager"), empty null, emptySet(), emptySet(), - Map.of("id", 5L, "name", "acme corp", "prop", false), + Map.of("id", 1L, "name", "acme corp", "prop", false), emptyMap()) .matches(event)) .isFalse(); @@ -584,7 +584,11 @@ private static ChangeEvent nodeCreateEvent() { "db:1", EntityOperation.CREATE, List.of("Person", "Employee"), - Map.of("Person", Map.of("id", 1L), "Employee", Map.of("id", 5L, "role", "manager")), + Map.of( + "Person", + List.of(Map.of("id", 1L), Map.of("name", "John")), + "Employee", + List.of(Map.of("id", 1L, "role", "manager"))), null, new NodeState( List.of("Person", "Employee"), Map.of("id", 1L, "name", "John", "surname", "Doe")))); @@ -612,7 +616,11 @@ private static ChangeEvent nodeDeleteEvent() { "db:1", EntityOperation.DELETE, List.of("Person", "Employee"), - Map.of("Person", Map.of("id", 1L), "Employee", Map.of("id", 5L, "role", "manager")), + Map.of( + "Person", + List.of(Map.of("id", 1L), Map.of("name", "John")), + "Employee", + List.of(Map.of("id", 1L, "role", "manager"))), new NodeState( List.of("Person", "Employee"), Map.of("id", 1L, "name", "John", "surname", "Doe")), null)); @@ -640,7 +648,11 @@ private static ChangeEvent nodeUpdateEvent() { "db:1", EntityOperation.UPDATE, List.of("Person", "Employee"), - Map.of("Person", Map.of("id", 1L), "Employee", Map.of("id", 5L, "role", "manager")), + Map.of( + "Person", + List.of(Map.of("id", 1L), Map.of("name", "John")), + "Employee", + List.of(Map.of("id", 1L, "role", "manager"))), new NodeState( List.of("Person", "Employee"), Map.of("id", 1L, "name", "John", "surname", "Doe")), new NodeState( @@ -669,9 +681,9 @@ private static ChangeEvent relationshipCreateEvent() { new RelationshipEvent( "db:2", "WORKS_FOR", - new Node("db:1", List.of("Person"), Map.of("Person", Map.of("id", 1L))), - new Node("db:2", List.of("Company"), Map.of("Company", Map.of("id", 5L))), - Map.of("year", 1990L), + new Node("db:1", List.of("Person"), Map.of("Person", List.of(Map.of("id", 1L)))), + new Node("db:2", List.of("Company"), Map.of("Company", List.of(Map.of("id", 5L)))), + List.of(Map.of("year", 1990L), Map.of("name", "John")), EntityOperation.CREATE, null, new RelationshipState(Map.of("id", 1L, "name", "John", "surname", "Doe")))); @@ -698,9 +710,9 @@ private static ChangeEvent relationshipDeleteEvent() { new RelationshipEvent( "db:2", "WORKS_FOR", - new Node("db:1", List.of("Person"), Map.of("Person", Map.of("id", 1L))), - new Node("db:2", List.of("Company"), Map.of("Company", Map.of("id", 5L))), - Map.of("year", 1990L), + new Node("db:1", List.of("Person"), Map.of("Person", List.of(Map.of("id", 1L)))), + new Node("db:2", List.of("Company"), Map.of("Company", List.of(Map.of("id", 5L)))), + List.of(Map.of("year", 1990L), Map.of("name", "John")), EntityOperation.DELETE, new RelationshipState(Map.of("id", 1L, "name", "John", "surname", "Doe")), null)); @@ -727,9 +739,9 @@ private static ChangeEvent relationshipUpdateEvent() { new RelationshipEvent( "db:2", "WORKS_FOR", - new Node("db:1", List.of("Person"), Map.of("Person", Map.of("id", 1L))), - new Node("db:2", List.of("Company"), Map.of("Company", Map.of("id", 5L))), - Map.of("year", 1990L), + new Node("db:1", List.of("Person"), Map.of("Person", List.of(Map.of("id", 1L)))), + new Node("db:2", List.of("Company"), Map.of("Company", List.of(Map.of("id", 5L)))), + List.of(Map.of("year", 1990L), Map.of("name", "John")), EntityOperation.UPDATE, new RelationshipState(Map.of("id", 1L, "name", "John", "surname", "Doe")), new RelationshipState(Map.of("id", 1L, "name", "Jack", "dob", LocalDate.of(1990, 1, 1)))));