Skip to content

Commit

Permalink
feat: add support for 5.15 key format and rename relationship keys (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
venikkin authored Dec 6, 2023
1 parent 898b189 commit 401c210
Show file tree
Hide file tree
Showing 11 changed files with 336 additions and 128 deletions.
77 changes: 77 additions & 0 deletions src/main/java/org/neo4j/cdc/client/model/ModelUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAccessor;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.collections4.MapUtils;

class ModelUtils {
Expand Down Expand Up @@ -88,4 +90,79 @@ static ZonedDateTime getZonedDateTime(Map<String, Object> map, String key) {
}
return ZonedDateTime.parse(value.toString(), DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSSX"));
}

static Map<String, List<Map<String, Object>>> getNodesKeys(Map<String, Object> cypherMap) {
var keysMap = coerseToMap(MapUtils.getObject(cypherMap, "keys"));
if (keysMap == null) {
return null;
}
if (keysMap.isEmpty()) {
return Collections.emptyMap();
}
var valueType = keysMap.values().iterator().next().getClass();

// Check if the key structure is pre Neo4j 5.15
if (Map.class.isAssignableFrom(valueType)) {
var preNode515keyMap = checkedMap(keysMap, String.class, Map.class);
return ModelUtils.transformMapValues(
preNode515keyMap, e -> List.of(ModelUtils.checkedMap(e, String.class, Object.class)));
} else {
var postNode515KeyMap = checkedMap(keysMap, String.class, List.class);
return ModelUtils.transformMapValues(
postNode515KeyMap, e -> coerceToListOfMaps(e, String.class, Object.class));
}
}

static Map<?, ?> coerseToMap(Object input) {
if (input == null) {
return null;
}
if (!(input instanceof Map)) {
throw new IllegalArgumentException(String.format(
"Unexpected type %s, expected Map", input.getClass().getSimpleName()));
}
return (Map<?, ?>) input;
}

static <K, V> List<Map<K, V>> coerceToListOfMaps(List<?> input, Class<K> keyType, Class<V> valueType) {
if (input == null) {
return null;
}
return input.stream()
.map(e -> {
if (e != null && !(e instanceof Map)) {
throw new IllegalArgumentException(
"There are elements of unsupported types in the provided list, expected Map");
}
try {
return checkedMap((Map<?, ?>) e, keyType, valueType);
} catch (RuntimeException ex) {
throw new IllegalArgumentException(
"There are elements of unsupported types in the provided list", ex);
}
})
.collect(Collectors.toList());
}

static <K, V1, V2> Map<K, V2> transformMapValues(Map<K, V1> input, Function<V1, V2> transform) {
if (input == null) {
return null;
}
return input.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> transform.apply(e.getValue())));
}

static List<Map<String, Object>> getRelationshipKeys(Map<String, Object> cypherMap) {
var keysList = ModelUtils.getList(cypherMap, "keys", Map.class);
if (keysList != null) {
return ModelUtils.coerceToListOfMaps(keysList, String.class, Object.class);
}

// Check if the key structure is pre Neo4j 5.15
var keyMap = ModelUtils.getMap(cypherMap, "key", String.class, Object.class);
if (keyMap == null) {
return null;
}
return List.of(keyMap);
}
}
15 changes: 4 additions & 11 deletions src/main/java/org/neo4j/cdc/client/model/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.collections4.MapUtils;

public class Node {

private final String elementId;
private final Map<String, Map<String, Object>> keys;
private final Map<String, List<Map<String, Object>>> keys;
private final List<String> labels;

public Node(String elementId, List<String> labels, Map<String, Map<String, Object>> keys) {
public Node(String elementId, List<String> labels, Map<String, List<Map<String, Object>>> keys) {
this.elementId = Objects.requireNonNull(elementId);
this.labels = labels;
this.keys = keys;
Expand All @@ -38,7 +37,7 @@ public String getElementId() {
return this.elementId;
}

public Map<String, Map<String, Object>> getKeys() {
public Map<String, List<Map<String, Object>>> getKeys() {
return this.keys;
}

Expand Down Expand Up @@ -76,13 +75,7 @@ public static Node fromMap(Map<?, ?> map) {

var elementId = MapUtils.getString(cypherMap, "elementId");
var labels = ModelUtils.getList(cypherMap, "labels", String.class);
var keysMap = ModelUtils.getMap(cypherMap, "keys", String.class, Map.class);
var keys = keysMap != null
? keysMap.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> ModelUtils.checkedMap(e.getValue(), String.class, Object.class)))
: null;
var keys = ModelUtils.getNodesKeys(cypherMap);

return new Node(elementId, labels, keys);
}
Expand Down
15 changes: 4 additions & 11 deletions src/main/java/org/neo4j/cdc/client/model/NodeEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.collections4.MapUtils;

public class NodeEvent extends EntityEvent<NodeState> {

private final Map<String, Map<String, Object>> keys;
private final Map<String, List<Map<String, Object>>> keys;
private final List<String> labels;

public NodeEvent(
String elementId,
EntityOperation operation,
List<String> labels,
Map<String, Map<String, Object>> keys,
Map<String, List<Map<String, Object>>> keys,
NodeState before,
NodeState after) {
super(elementId, EventType.NODE, operation, before, after);
Expand All @@ -44,7 +43,7 @@ public List<String> getLabels() {
return this.labels;
}

public Map<String, Map<String, Object>> getKeys() {
public Map<String, List<Map<String, Object>>> getKeys() {
return this.keys;
}

Expand Down Expand Up @@ -81,13 +80,7 @@ public static NodeEvent fromMap(Map<?, ?> map) {
var elementId = MapUtils.getString(cypherMap, "elementId");
var operation = EntityOperation.fromShorthand(MapUtils.getString(cypherMap, "operation"));
var labels = ModelUtils.getList(cypherMap, "labels", String.class);
var keysMap = ModelUtils.getMap(cypherMap, "keys", String.class, Map.class);
var keys = keysMap != null
? keysMap.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> ModelUtils.checkedMap(e.getValue(), String.class, Object.class)))
: null;
var keys = ModelUtils.getNodesKeys(cypherMap);

var state = ModelUtils.checkedMap(
Objects.requireNonNull(MapUtils.getMap(cypherMap, "state")), String.class, Object.class);
Expand Down
21 changes: 11 additions & 10 deletions src/main/java/org/neo4j/cdc/client/model/RelationshipEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.neo4j.cdc.client.model;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.collections4.MapUtils;
Expand All @@ -25,14 +26,14 @@ public class RelationshipEvent extends EntityEvent<RelationshipState> {
private final Node start;
private final Node end;
private final String type;
private final Map<String, Object> key;
private final List<Map<String, Object>> keys;

public RelationshipEvent(
String elementId,
String type,
Node start,
Node end,
Map<String, Object> key,
List<Map<String, Object>> keys,
EntityOperation operation,
RelationshipState before,
RelationshipState after) {
Expand All @@ -41,7 +42,7 @@ public RelationshipEvent(
this.start = Objects.requireNonNull(start);
this.end = Objects.requireNonNull(end);
this.type = Objects.requireNonNull(type);
this.key = key;
this.keys = keys;
}

public Node getStart() {
Expand All @@ -56,8 +57,8 @@ public String getType() {
return this.type;
}

public Map<String, Object> getKey() {
return this.key;
public List<Map<String, Object>> getKeys() {
return this.keys;
}

@Override
Expand All @@ -71,7 +72,7 @@ public boolean equals(Object o) {
if (!start.equals(that.start)) return false;
if (!end.equals(that.end)) return false;
if (!type.equals(that.type)) return false;
return Objects.equals(key, that.key);
return Objects.equals(keys, that.keys);
}

@Override
Expand All @@ -80,15 +81,15 @@ public int hashCode() {
result = 31 * result + start.hashCode();
result = 31 * result + end.hashCode();
result = 31 * result + type.hashCode();
result = 31 * result + (key != null ? key.hashCode() : 0);
result = 31 * result + (keys != null ? keys.hashCode() : 0);
return result;
}

@Override
public String toString() {
return String.format(
"RelationshipEvent{elementId=%s, start=%s, end=%s, type='%s', key=%s, operation=%s, before=%s, after=%s}",
getElementId(), start, end, type, key, getOperation(), getBefore(), getAfter());
"RelationshipEvent{elementId=%s, start=%s, end=%s, type='%s', keys=%s, operation=%s, before=%s, after=%s}",
getElementId(), start, end, type, keys, getOperation(), getBefore(), getAfter());
}

public static RelationshipEvent fromMap(Map<?, ?> map) {
Expand All @@ -99,7 +100,7 @@ public static RelationshipEvent fromMap(Map<?, ?> map) {
var type = MapUtils.getString(cypherMap, "type");
var start = Node.fromMap(ModelUtils.getMap(cypherMap, "start", String.class, Object.class));
var end = Node.fromMap(ModelUtils.getMap(cypherMap, "end", String.class, Object.class));
var key = ModelUtils.getMap(cypherMap, "key", String.class, Object.class);
var key = ModelUtils.getRelationshipKeys(cypherMap);

var state = ModelUtils.checkedMap(
Objects.requireNonNull(MapUtils.getMap(cypherMap, "state")), String.class, Object.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public ChangeEvent applyProperties(ChangeEvent e) {
relationshipEvent.getType(),
relationshipEvent.getStart(),
relationshipEvent.getEnd(),
relationshipEvent.getKey(),
relationshipEvent.getKeys(),
relationshipEvent.getOperation(),
beforeState,
afterState));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public boolean matches(ChangeEvent e) {
return false;
}

if (!key.isEmpty() && !nodeEvent.getKeys().containsValue(key)) {
if (!key.isEmpty()
&& nodeEvent.getKeys().values().stream().flatMap(List::stream).noneMatch(key::equals)) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,22 @@ public boolean matches(ChangeEvent e) {
if (start.getLabels().stream()
.anyMatch(l -> !relationshipEvent.getStart().getLabels().contains(l))
|| (!start.getKey().isEmpty()
&& !relationshipEvent.getStart().getKeys().containsValue(start.getKey()))) {
&& relationshipEvent.getStart().getKeys().values().stream()
.flatMap(List::stream)
.noneMatch(start.getKey()::equals))) {
return false;
}

if (end.getLabels().stream()
.anyMatch(l -> !relationshipEvent.getEnd().getLabels().contains(l))
|| (!end.getKey().isEmpty()
&& !relationshipEvent.getEnd().getKeys().containsValue(end.getKey()))) {
&& relationshipEvent.getEnd().getKeys().values().stream()
.flatMap(List::stream)
.noneMatch(end.getKey()::equals))) {
return false;
}

if (!key.isEmpty() && !Objects.equals(key, relationshipEvent.getKey())) {
if (!key.isEmpty() && !relationshipEvent.getKeys().contains(key)) {
return false;
}

Expand Down
69 changes: 69 additions & 0 deletions src/test/java/org/neo4j/cdc/client/CDCClient514IT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.cdc.client;

import java.util.Collections;
import java.util.Map;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.testcontainers.containers.Neo4jContainer;
import org.testcontainers.junit.jupiter.Container;

/**
* Neo4j 5.15+ introduced a breaking change in node ande relationship keys structure. This suite verifies if
* CDC Client is backward compatible with 5.14 and earlier.
*/
public class CDCClient514IT extends CDCClientIT {

private static final String NEO4J_VERSION = "5.14";

@SuppressWarnings("resource")
@Container
private static final Neo4jContainer<?> neo4j = new Neo4jContainer<>("neo4j:" + NEO4J_VERSION + "-enterprise")
.withEnv("NEO4J_ACCEPT_LICENSE_AGREEMENT", "yes")
.withAdminPassword("passw0rd");

private static Driver driver;

@BeforeAll
static void setup() {
driver = GraphDatabase.driver(neo4j.getBoltUrl(), AuthTokens.basic("neo4j", "passw0rd"));
}

@AfterAll
static void cleanup() {
driver.close();
}

@Override
Driver driver() {
return driver;
}

@Override
Neo4jContainer<?> neo4j() {
return neo4j;
}

@Override
Map<String, Object> defaultExpectedAdditionalEntries() {
return Collections.emptyMap();
}
}
Loading

0 comments on commit 401c210

Please sign in to comment.