From 59915d8a4b3d02a8fa688f979427962e3272f775 Mon Sep 17 00:00:00 2001 From: Florent Biville <445792+fbiville@users.noreply.github.com> Date: Fri, 22 Nov 2024 17:51:45 +0100 Subject: [PATCH] feat: introduce pipeline API (#169) First, `ImportSpecification` is now devoid of any "business" logic. It has instead become a simple target for JSON/Yaml deserialization. This commit introduces `ImportPipeline` which takes care of A LOT of boilerplate when writing an import-spec backend. It wraps sources, actions and targets as tasks and sorts them accordingly. The task classes provide helper functions to resolve key/non-key properties, start/end nodes. BREAKING CHANGE: source names must now be globally unique, since this simplifies the pipeline construction logic quite a bit. --- .teamcity/builds/Build.kt | 2 +- .teamcity/builds/Common.kt | 2 +- README.md | 21 +- .../org/neo4j/importer/BeamExampleIT.java | 373 +++++++---------- .../neo4j/importer/Neo4jAdminExampleIT.java | 49 ++- pom.xml | 31 ++ .../importer/v1/ImportSpecification.java | 9 +- .../v1/graph/{Graph.java => Graphs.java} | 9 +- .../importer/v1/pipeline/ActionStep.java | 63 +++ .../v1/pipeline/CustomQueryTargetStep.java | 53 +++ .../v1/pipeline/EntityTargetStep.java | 43 ++ .../importer/v1/pipeline/ImportPipeline.java | 331 +++++++++++++++ .../importer/v1/pipeline/ImportStep.java | 28 ++ .../importer/v1/pipeline/NodeTargetStep.java | 126 ++++++ .../v1/pipeline/RelationshipTargetStep.java | 150 +++++++ .../SourceStep.java} | 33 +- .../importer/v1/pipeline/TargetStep.java | 57 +++ .../importer/v1/targets/EntityTarget.java | 18 +- .../neo4j/importer/v1/targets/NodeTarget.java | 43 -- .../v1/targets/RelationshipTarget.java | 59 +-- .../org/neo4j/importer/v1/targets/Target.java | 13 +- .../validation/SpecificationValidators.java | 4 +- .../plugin/NoDependencyCycleValidator.java | 6 +- ...or.java => NoDuplicatedNameValidator.java} | 10 +- .../v1/pipeline/EntityTargetStep.java | 43 ++ .../importer/v1/pipeline/ImportStep.java | 28 ++ .../importer/v1/pipeline/TargetStep.java | 57 +++ ...orter.v1.validation.SpecificationValidator | 3 +- ...cationDeserializerExtraValidationTest.java | 36 +- .../org/neo4j/importer/v1/e2e/BeamIT.java | 385 +++++++----------- .../graph/{GraphTest.java => GraphsTest.java} | 18 +- .../v1/pipeline/NodeTargetTaskTest.java | 160 ++++++++ .../pipeline/RelationshipTargetTaskTest.java | 199 +++++++++ .../importer/v1/targets/EntityTargetTest.java | 11 +- .../importer/v1/targets/NodeTargetTest.java | 109 ----- .../v1/targets/RelationshipTargetTest.java | 148 ------- .../neo4j/importer/v1/targets/TargetTest.java | 116 ------ 37 files changed, 1774 insertions(+), 1072 deletions(-) rename src/main/java/org/neo4j/importer/v1/graph/{Graph.java => Graphs.java} (97%) create mode 100644 src/main/java/org/neo4j/importer/v1/pipeline/ActionStep.java create mode 100644 src/main/java/org/neo4j/importer/v1/pipeline/CustomQueryTargetStep.java create mode 100644 src/main/java/org/neo4j/importer/v1/pipeline/EntityTargetStep.java create mode 100644 src/main/java/org/neo4j/importer/v1/pipeline/ImportPipeline.java create mode 100644 src/main/java/org/neo4j/importer/v1/pipeline/ImportStep.java create mode 100644 src/main/java/org/neo4j/importer/v1/pipeline/NodeTargetStep.java create mode 100644 src/main/java/org/neo4j/importer/v1/pipeline/RelationshipTargetStep.java rename src/main/java/org/neo4j/importer/v1/{validation/plugin/NoDuplicatedSourceNameValidator.java => pipeline/SourceStep.java} (50%) create mode 100644 src/main/java/org/neo4j/importer/v1/pipeline/TargetStep.java rename src/main/java/org/neo4j/importer/v1/validation/plugin/{NoDuplicatedTargetActionNameValidator.java => NoDuplicatedNameValidator.java} (91%) create mode 100644 src/main/java17/org/neo4j/importer/v1/pipeline/EntityTargetStep.java create mode 100644 src/main/java17/org/neo4j/importer/v1/pipeline/ImportStep.java create mode 100644 src/main/java17/org/neo4j/importer/v1/pipeline/TargetStep.java rename src/test/java/org/neo4j/importer/v1/graph/{GraphTest.java => GraphsTest.java} (91%) create mode 100644 src/test/java/org/neo4j/importer/v1/pipeline/NodeTargetTaskTest.java create mode 100644 src/test/java/org/neo4j/importer/v1/pipeline/RelationshipTargetTaskTest.java delete mode 100644 src/test/java/org/neo4j/importer/v1/targets/NodeTargetTest.java delete mode 100644 src/test/java/org/neo4j/importer/v1/targets/RelationshipTargetTest.java diff --git a/.teamcity/builds/Build.kt b/.teamcity/builds/Build.kt index f728ce7..1e298e5 100644 --- a/.teamcity/builds/Build.kt +++ b/.teamcity/builds/Build.kt @@ -22,7 +22,7 @@ class Build( buildType(WhiteListCheck("${name}-whitelist-check", "white-list check")) if (forPullRequests) dependentBuildType(PRCheck("${name}-pr-check", "pr check")) parallel { - listOf("11", "17", "21").forEach { java -> + listOf("17", "21").forEach { java -> dependentBuildType( Maven( "${name}-build-${java}", diff --git a/.teamcity/builds/Common.kt b/.teamcity/builds/Common.kt index cbfe7bb..2017eeb 100644 --- a/.teamcity/builds/Common.kt +++ b/.teamcity/builds/Common.kt @@ -13,7 +13,7 @@ const val GITHUB_OWNER = "neo4j" const val GITHUB_REPOSITORY = "import-spec" const val MAVEN_DEFAULT_ARGS = "--no-transfer-progress --batch-mode --show-version" -const val DEFAULT_JAVA_VERSION = "11" +const val DEFAULT_JAVA_VERSION = "17" const val LTS_JAVA_VERSION = "21" enum class LinuxSize(val value: String) { diff --git a/README.md b/README.md index 7dc23e4..f415316 100644 --- a/README.md +++ b/README.md @@ -68,16 +68,15 @@ class GettingStarted { public static void main(String... args) { - try (Reader reader = createReader("spec.json")) { - var importSpecification = ImportSpecificationDeserializer.deserialize(reader); - - var config = importSpecification.getConfiguration(); - var sources = importSpecification.getSources(); - var actions = importSpecification.getActions(); - var targets = importSpecification.getTargets(); - var nodeTargets = targets.getNodes(); - var relationshipTargets = targets.getRelationships(); - var customQueryTargets = targets.getCustomQueries(); + try (var reader = new InputStreamReader(createReaderFor("/import/spec.yaml"))) { + var pipeline = ImportPipeline.of(ImportSpecificationDeserializer.deserialize(reader)); + pipeline.forEach(step -> { + switch (step) { + case SourceStep source -> handleSource(source); + case ActionStep action -> handleAction(action); + case TargetStep target -> handleTarget(target); + } + }); } } } @@ -86,4 +85,4 @@ class GettingStarted { ## Prerequisites - Maven -- JDK 21 (21 is used for tests, 11 for production sources) +- JDK 21 (21 is used for tests, 11 and 17 for production sources) diff --git a/examples/apache-beam/src/test/java/org/neo4j/importer/BeamExampleIT.java b/examples/apache-beam/src/test/java/org/neo4j/importer/BeamExampleIT.java index e820cc4..7d7bd45 100644 --- a/examples/apache-beam/src/test/java/org/neo4j/importer/BeamExampleIT.java +++ b/examples/apache-beam/src/test/java/org/neo4j/importer/BeamExampleIT.java @@ -31,15 +31,15 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; @@ -70,7 +70,6 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.checkerframework.checker.nullness.qual.NonNull; -import org.jetbrains.annotations.NotNull; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -88,21 +87,20 @@ import org.neo4j.driver.Session; import org.neo4j.driver.summary.ResultSummary; import org.neo4j.driver.summary.SummaryCounters; -import org.neo4j.importer.v1.ImportSpecification; import org.neo4j.importer.v1.ImportSpecificationDeserializer; -import org.neo4j.importer.v1.actions.Action; -import org.neo4j.importer.v1.actions.ActionStage; import org.neo4j.importer.v1.actions.plugin.CypherAction; import org.neo4j.importer.v1.actions.plugin.CypherExecutionMode; -import org.neo4j.importer.v1.graph.Graph; +import org.neo4j.importer.v1.pipeline.ActionStep; +import org.neo4j.importer.v1.pipeline.EntityTargetStep; +import org.neo4j.importer.v1.pipeline.ImportPipeline; +import org.neo4j.importer.v1.pipeline.ImportStep; +import org.neo4j.importer.v1.pipeline.NodeTargetStep; +import org.neo4j.importer.v1.pipeline.RelationshipTargetStep; +import org.neo4j.importer.v1.pipeline.SourceStep; +import org.neo4j.importer.v1.pipeline.TargetStep; import org.neo4j.importer.v1.sources.Source; import org.neo4j.importer.v1.sources.SourceProvider; -import org.neo4j.importer.v1.targets.EntityTarget; -import org.neo4j.importer.v1.targets.NodeTarget; -import org.neo4j.importer.v1.targets.PropertyMapping; import org.neo4j.importer.v1.targets.PropertyType; -import org.neo4j.importer.v1.targets.RelationshipTarget; -import org.neo4j.importer.v1.targets.Target; import org.testcontainers.containers.Neo4jContainer; import org.testcontainers.utility.DockerImageName; @@ -122,66 +120,14 @@ public void imports_dvd_rental_data_set() throws Exception { assertThat(stream).isNotNull(); try (var reader = new InputStreamReader(stream)) { - var specification = ImportSpecificationDeserializer.deserialize(reader); - var sourceOutputs = new HashMap>( - specification.getSources().size()); - var targetOutputs = new HashMap>(); - var sortedTargets = sortTargets(specification.getTargets().getAll()); - sortedTargets.forEach(target -> { - assertThat(target).isInstanceOf(EntityTarget.class); - var schemaInitOutput = pipeline.apply( - "[target %s] Create single input".formatted(target.getName()), Create.of(1)) - .setCoder(VarIntCoder.of()) - .apply( - "[target %s] Init schema".formatted(target.getName()), - TargetSchemaIO.initSchema( - NEO4J.getBoltUrl(), NEO4J.getAdminPassword(), (EntityTarget) target)); - - var source = specification.findSourceByName(target.getSource()); - assertThat(source).isInstanceOf(ParquetSource.class); - var sourceRecords = sourceOutputs.computeIfAbsent(source, (src) -> { - var parquetSource = (ParquetSource) src; - return pipeline.apply( - "[source %s] Read records".formatted(parquetSource.getName()), - ParquetIO.parseGenericRecords( - (SerializableFunction) record -> record) - .withCoder(GenericRecordCoder.create()) - .from(parquetSource.uri())); - }); - var targetOutput = sourceRecords - .apply( - "[target %s] Wait for implicit dependencies".formatted(target.getName()), - Wait.on(dependenciesOf(target, targetOutputs, schemaInitOutput))) - .setCoder(sourceRecords.getCoder()) - .apply( - "[target %s] Assign keys to records".formatted(target.getName()), - WithKeys.of((SerializableFunction) - input -> ThreadLocalRandom.current() - .nextInt( - Runtime.getRuntime().availableProcessors()))) - .setCoder(KvCoder.of(VarIntCoder.of(), sourceRecords.getCoder())) - .apply( - "[target %s] Group records into batches".formatted(target.getName()), - GroupIntoBatches.ofSize(50)) - .apply( - "[target %s] Write record batches to Neo4j".formatted(target.getName()), - TargetIO.writeAll( - NEO4J.getBoltUrl(), NEO4J.getAdminPassword(), specification, target)); - - targetOutputs.put(target.getName(), targetOutput); - }); - - specification.getActions().forEach(action -> { - assertThat(action).isInstanceOf(CypherAction.class); - pipeline.apply("[action %s] Create single input".formatted(action.getName()), Create.of(1)) - .apply( - "[action %s] Wait for dependencies inferred from stage".formatted(action.getName()), - Wait.on(stageDependenciesOf(action, targetOutputs))) - .setCoder(VarIntCoder.of()) - .apply( - "[action %s] Run".formatted(action.getName()), - CypherActionIO.run( - (CypherAction) action, NEO4J.getBoltUrl(), NEO4J.getAdminPassword())); + var outputs = new HashMap>(); + var importPipeline = ImportPipeline.of(ImportSpecificationDeserializer.deserialize(reader)); + importPipeline.forEach(step -> { + switch (step) { + case SourceStep source -> handleSource(source, outputs); + case ActionStep action -> handleAction(action, outputs); + case TargetStep target -> handleTarget(target, outputs); + } }); } } @@ -204,47 +150,72 @@ public void imports_dvd_rental_data_set() throws Exception { } } - private @NotNull List> dependenciesOf( - Target target, - Map> targetOutputs, - PCollection schemaInitOutput) { - List> implicitDependencies = implicitDependenciesOf(target, targetOutputs); - var dependencies = new ArrayList>(1 + implicitDependencies.size()); - dependencies.add(schemaInitOutput); - dependencies.addAll(implicitDependencies); - return dependencies; + private void handleSource(SourceStep step, Map> outputs) { + var name = step.name(); + var source = step.source(); + assertThat(source).isInstanceOf(ParquetSource.class); + var parquetSource = (ParquetSource) source; + var output = pipeline.apply( + "[source %s] Read records".formatted(name), + ParquetIO.parseGenericRecords((SerializableFunction) record -> record) + .withCoder(GenericRecordCoder.create()) + .from(parquetSource.uri())); + outputs.put(source.getName(), output); } - private static List sortTargets(List targets) { - Map> dependencyGraph = new HashMap<>(); - targets.forEach(target -> { - if (target instanceof RelationshipTarget relationshipTarget) { - dependencyGraph.put( - relationshipTarget, - Set.of( - findTargetByName(targets, relationshipTarget.getStartNodeReference()), - findTargetByName(targets, relationshipTarget.getEndNodeReference()))); - } else { - dependencyGraph.put(target, Set.of()); - } - }); - return Graph.runTopologicalSort(dependencyGraph); + private void handleAction(ActionStep step, Map> outputs) { + var actionName = step.name(); + var action = step.action(); + assertThat(action).isInstanceOf(CypherAction.class); + var cypherAction = (CypherAction) action; + PCollection output = pipeline.apply( + "[action %s] Create single input".formatted(actionName), Create.of(1)) + .apply( + "[action %s] Wait for dependencies inferred from stage".formatted(actionName), + Wait.on(stepsToOutputs(step.dependencies(), outputs))) + .setCoder(VarIntCoder.of()) + .apply( + "[action %s] Run".formatted(actionName), + CypherActionIO.run(cypherAction, NEO4J.getBoltUrl(), NEO4J.getAdminPassword())); + outputs.put(actionName, output); } - private List> implicitDependenciesOf( - Target target, Map> targetOutputs) { - if (target instanceof RelationshipTarget relationshipTarget) { - return List.of( - targetOutputs.get(relationshipTarget.getStartNodeReference()), - targetOutputs.get(relationshipTarget.getEndNodeReference())); - } - return List.of(); + @SuppressWarnings("unchecked") + private void handleTarget(TargetStep step, Map> outputs) { + var stepName = step.name(); + assertThat(step).isInstanceOf(EntityTargetStep.class); + var entityTargetStep = (EntityTargetStep) step; + var schemaInitOutput = pipeline.apply("[target %s] Create single input".formatted(stepName), Create.of(1)) + .setCoder(VarIntCoder.of()) + .apply( + "[target %s] Init schema".formatted(stepName), + TargetSchemaIO.initSchema(NEO4J.getBoltUrl(), NEO4J.getAdminPassword(), entityTargetStep)); + var sourceRecords = (PCollection) outputs.get(step.sourceName()); + var sourceCoder = sourceRecords.getCoder(); + var output = sourceRecords + .apply( + "[target %s] Wait for implicit dependencies".formatted(stepName), + Wait.on(stepsToOutputs(step.dependencies(), outputs, schemaInitOutput))) + .setCoder(sourceCoder) + .apply( + "[target %s] Assign keys to records".formatted(stepName), + WithKeys.of((SerializableFunction) input -> ThreadLocalRandom.current() + .nextInt(Runtime.getRuntime().availableProcessors()))) + .setCoder(KvCoder.of(VarIntCoder.of(), sourceCoder)) + .apply("[target %s] Group records into batches".formatted(stepName), GroupIntoBatches.ofSize(50)) + .apply( + "[target %s] Write record batches to Neo4j".formatted(stepName), + TargetIO.writeAll(NEO4J.getBoltUrl(), NEO4J.getAdminPassword(), entityTargetStep)); + outputs.put(stepName, output); } - private List<@NonNull PCollection> stageDependenciesOf( - Action action, Map> targetOutputs) { - assertThat(action.getStage()).isEqualTo(ActionStage.END); - return new ArrayList<>(targetOutputs.values()); + private static List> stepsToOutputs( + List dependencies, Map> outputs, PCollection... extras) { + var result = dependencies.stream() + .map(step -> outputs.get(step.name())) + .collect(Collectors.toCollection((Supplier>>) ArrayList::new)); + Collections.addAll(result, extras); + return result; } private static void assertSchema(Driver driver) { @@ -385,16 +356,16 @@ private static class TargetSchemaIO private final String password; - private final EntityTarget target; + private final EntityTargetStep target; - private TargetSchemaIO(String url, String password, EntityTarget target) { + private TargetSchemaIO(String url, String password, EntityTargetStep target) { this.url = url; this.password = password; this.target = target; } public static PTransform<@NonNull PCollection, @NonNull PCollection> initSchema( - String url, String password, EntityTarget target) { + String url, String password, EntityTargetStep target) { return new TargetSchemaIO(url, password, target); } @@ -409,17 +380,17 @@ private static class TargetSchemaWriteFn extends DoFn { private final String password; - private final EntityTarget target; + private final EntityTargetStep target; private transient Driver driver; - public TargetSchemaWriteFn(String url, String password, EntityTarget target) { + public TargetSchemaWriteFn(String url, String password, EntityTargetStep target) { this.url = url; this.password = password; this.target = target; } - public static DoFn of(String url, String password, EntityTarget target) { + public static DoFn of(String url, String password, EntityTargetStep target) { return new TargetSchemaWriteFn(url, password, target); } @@ -441,11 +412,9 @@ public void tearDown() { public void processElement(ProcessContext context) { var schemaStatements = switch (target) { - case NodeTarget nodeTarget -> generateNodeSchemaStatements(nodeTarget); - case RelationshipTarget relationshipTarget -> generateRelationshipSchemaStatements( + case NodeTargetStep nodeTarget -> generateNodeSchemaStatements(nodeTarget); + case RelationshipTargetStep relationshipTarget -> generateRelationshipSchemaStatements( relationshipTarget); - default -> throw new RuntimeException( - "unsupported target type: %s".formatted(target.getClass())); }; if (schemaStatements.isEmpty()) { @@ -459,8 +428,8 @@ public void processElement(ProcessContext context) { } } - private List generateNodeSchemaStatements(NodeTarget nodeTarget) { - var schema = nodeTarget.getSchema(); + private List generateNodeSchemaStatements(NodeTargetStep step) { + var schema = step.schema(); if (schema == null) { return Collections.emptyList(); } @@ -469,7 +438,10 @@ private List generateNodeSchemaStatements(NodeTarget nodeTarget) { .map(constraint -> "CREATE CONSTRAINT %s FOR (n:%s) REQUIRE (%s) IS NODE KEY" .formatted( generateName( - nodeTarget, "key", constraint.getLabel(), constraint.getProperties()), + step, + "key", + constraint.getLabel(), + constraint.getProperties()), sanitize(constraint.getLabel()), constraint.getProperties().stream() .map(TargetSchemaWriteFn::sanitize) @@ -480,7 +452,7 @@ private List generateNodeSchemaStatements(NodeTarget nodeTarget) { .map(constraint -> "CREATE CONSTRAINT %s FOR (n:%s) REQUIRE (%s) IS UNIQUE" .formatted( generateName( - nodeTarget, + step, "unique", constraint.getLabel(), constraint.getProperties()), @@ -490,24 +462,24 @@ private List generateNodeSchemaStatements(NodeTarget nodeTarget) { .map(prop -> propertyOf("n", prop)) .collect(Collectors.joining(",")))) .toList()); + Map propertyTypes = step.propertyTypes(); statements.addAll(schema.getTypeConstraints().stream() .map(constraint -> "CREATE CONSTRAINT %s FOR (n:%s) REQUIRE n.%s IS :: %s" .formatted( generateName( - nodeTarget, + step, "type", constraint.getLabel(), List.of(constraint.getProperty())), sanitize(constraint.getLabel()), sanitize(constraint.getProperty()), - propertyType(findPropertyType( - nodeTarget.getProperties(), constraint.getProperty())))) + propertyType(propertyTypes.get(constraint.getProperty())))) .toList()); return statements; } - private List generateRelationshipSchemaStatements(RelationshipTarget relationshipTarget) { - var schema = relationshipTarget.getSchema(); + private List generateRelationshipSchemaStatements(RelationshipTargetStep step) { + var schema = step.schema(); if (schema == null) { return Collections.emptyList(); } @@ -516,11 +488,11 @@ private List generateRelationshipSchemaStatements(RelationshipTarget rel .map(constraint -> "CREATE CONSTRAINT %s FOR ()-[r:%s]-() REQUIRE (%s) IS RELATIONSHIP KEY" .formatted( generateName( - relationshipTarget, + step, "key", - relationshipTarget.getType(), + step.type(), constraint.getProperties()), - sanitize(relationshipTarget.getType()), + sanitize(step.type()), constraint.getProperties().stream() .map(TargetSchemaWriteFn::sanitize) .map(prop -> propertyOf("r", prop)) @@ -530,44 +502,35 @@ private List generateRelationshipSchemaStatements(RelationshipTarget rel .map(constraint -> "CREATE CONSTRAINT %s FOR ()-[r:%s]-() REQUIRE (%s) IS UNIQUE" .formatted( generateName( - relationshipTarget, + step, "unique", - relationshipTarget.getType(), + step.type(), constraint.getProperties()), - sanitize(relationshipTarget.getType()), + sanitize(step.type()), constraint.getProperties().stream() .map(TargetSchemaWriteFn::sanitize) .map(prop -> propertyOf("r", prop)) .collect(Collectors.joining(",")))) .toList()); + Map propertyTypes = step.propertyTypes(); statements.addAll(schema.getTypeConstraints().stream() .map(constraint -> "CREATE CONSTRAINT %s FOR ()-[r:%s]-() REQUIRE r.%s IS :: %s" .formatted( generateName( - relationshipTarget, + step, "type", - relationshipTarget.getType(), + step.type(), List.of(constraint.getProperty())), - sanitize(relationshipTarget.getType()), + sanitize(step.type()), sanitize(constraint.getProperty()), - propertyType(findPropertyType( - relationshipTarget.getProperties(), constraint.getProperty())))) + propertyType(propertyTypes.get(constraint.getProperty())))) .toList()); return statements; } - private static PropertyType findPropertyType(List mappings, String property) { - var result = mappings.stream() - .filter(mapping -> mapping.getTargetProperty().equals(property)) - .map(PropertyMapping::getTargetPropertyType) - .toList(); - assertThat(result).hasSize(1); - return result.getFirst(); - } - private static String generateName( - EntityTarget target, String type, String label, List properties) { - return sanitize("%s_%s_%s_%s".formatted(target.getName(), type, label, String.join("-", properties))); + EntityTargetStep target, String type, String label, List properties) { + return sanitize("%s_%s_%s_%s".formatted(target.name(), type, label, String.join("-", properties))); } private static String propertyOf(String container, String property) { @@ -619,26 +582,23 @@ private static class TargetIO private final String password; - private final ImportSpecification spec; - - private final Target target; + private final EntityTargetStep target; - private TargetIO(String url, String password, ImportSpecification spec, Target target) { + private TargetIO(String url, String password, EntityTargetStep target) { this.url = url; this.password = password; - this.spec = spec; this.target = target; } public static PTransform< @NonNull PCollection>>, @NonNull PCollection> - writeAll(String boltUrl, String adminPassword, ImportSpecification spec, Target target) { - return new TargetIO(boltUrl, adminPassword, spec, target); + writeAll(String boltUrl, String adminPassword, EntityTargetStep target) { + return new TargetIO(boltUrl, adminPassword, target); } @Override public @NonNull PCollection expand(PCollection>> input) { - return input.apply(ParDo.of(TargetWriteFn.of(url, password, spec, target))); + return input.apply(ParDo.of(TargetWriteFn.of(url, password, target))); } private static class TargetWriteFn extends DoFn>, WriteCounters> { @@ -647,22 +607,19 @@ private static class TargetWriteFn extends DoFn>, WriteCounters> of( - String url, String password, ImportSpecification spec, Target target) { - return new TargetWriteFn(url, password, spec, target); + String url, String password, EntityTargetStep target) { + return new TargetWriteFn(url, password, target); } @Setup @@ -691,17 +648,9 @@ public void processElement(ProcessContext context) { assertThat(records).isNotNull(); var statement = switch (target) { - case NodeTarget nodeTarget -> buildNodeImportQuery(nodeTarget, unwindRows, row); - case RelationshipTarget relationshipTarget -> { - var startNodeTarget = findTargetByName( - spec.getTargets().getNodes(), relationshipTarget.getStartNodeReference()); - var endNodeTarget = findTargetByName( - spec.getTargets().getNodes(), relationshipTarget.getEndNodeReference()); - yield buildRelationshipImportQuery( - relationshipTarget, startNodeTarget, endNodeTarget, unwindRows, row); - } - default -> throw new RuntimeException( - "unsupported target type: %s".formatted(target.getClass())); + case NodeTargetStep nodeTarget -> buildNodeImportQuery(nodeTarget, unwindRows, row); + case RelationshipTargetStep relationshipTarget -> buildRelationshipImportQuery( + relationshipTarget, unwindRows, row); }; var summary = WriteCounters.of(driver.executableQuery(statement.getCypher()) @@ -712,11 +661,11 @@ yield buildRelationshipImportQuery( } private static Statement buildNodeImportQuery( - NodeTarget nodeTarget, OngoingReading unwindRows, SymbolicName row) { + NodeTargetStep nodeTarget, OngoingReading unwindRows, SymbolicName row) { var node = cypherNode(nodeTarget, row); var nonKeyProps = nonKeyPropertiesOf(nodeTarget, node.getRequiredSymbolicName(), row); var query = - switch (nodeTarget.getWriteMode()) { + switch (nodeTarget.writeMode()) { case CREATE -> { var create = unwindRows.create(node); if (nonKeyProps.isEmpty()) { @@ -736,26 +685,22 @@ private static Statement buildNodeImportQuery( } private Statement buildRelationshipImportQuery( - RelationshipTarget relationshipTarget, - NodeTarget startNodeTarget, - NodeTarget endNodeTarget, - OngoingReading unwindRows, - SymbolicName row) { - var startNode = cypherNode(startNodeTarget, row, "start"); - var endNode = cypherNode(endNodeTarget, row, "end"); + RelationshipTargetStep relationshipTarget, OngoingReading unwindRows, SymbolicName row) { + var startNode = cypherNode(relationshipTarget.startNode(), row, "start"); + var endNode = cypherNode(relationshipTarget.endNode(), row, "end"); var relationship = startNode - .relationshipTo(endNode, relationshipTarget.getType()) + .relationshipTo(endNode, relationshipTarget.type()) .named("r") .withProperties(keyPropertiesOf(relationshipTarget, row)); var nonKeyProps = nonKeyPropertiesOf(relationshipTarget, relationship.getRequiredSymbolicName(), row); var queryStart = - switch (relationshipTarget.getNodeMatchMode()) { + switch (relationshipTarget.nodeMatchMode()) { case MATCH -> unwindRows.match(startNode).match(endNode); case MERGE -> unwindRows.merge(startNode).merge(endNode); }; var query = - switch (relationshipTarget.getWriteMode()) { + switch (relationshipTarget.writeMode()) { case CREATE -> { var create = queryStart.create(relationship); if (nonKeyProps.isEmpty()) { @@ -774,54 +719,32 @@ private Statement buildRelationshipImportQuery( return query.build(); } - private static Node cypherNode(NodeTarget nodeTarget, SymbolicName row) { + private static Node cypherNode(NodeTargetStep nodeTarget, SymbolicName row) { return cypherNode(nodeTarget, row, "n"); } - private static Node cypherNode(NodeTarget nodeTarget, SymbolicName row, String variableName) { - List labels = nodeTarget.getLabels(); + private static Node cypherNode(NodeTargetStep nodeTarget, SymbolicName row, String variableName) { + List labels = nodeTarget.labels(); return Cypher.node(labels.getFirst(), labels.subList(1, labels.size())) .named(variableName) .withProperties(keyPropertiesOf(nodeTarget, row)); } - private static MapExpression keyPropertiesOf(EntityTarget nodeTarget, SymbolicName rowVariable) { - var keyProperties = nodeTarget.getKeyProperties(); - var expressions = new ArrayList<>(keyProperties.size() * 2); - keyProperties.forEach(property -> { - expressions.add(property); - expressions.add(Cypher.property(rowVariable, sourceFieldFor(nodeTarget, property))); - }); - return Cypher.mapOf(expressions.toArray()); + private static MapExpression keyPropertiesOf(EntityTargetStep target, SymbolicName rowVariable) { + return Cypher.mapOf(target.keyProperties().stream() + .flatMap(mapping -> Stream.of( + mapping.getTargetProperty(), Cypher.property(rowVariable, mapping.getSourceField()))) + .toArray()); } private static Collection nonKeyPropertiesOf( - EntityTarget target, SymbolicName entityVariable, SymbolicName rowVariable) { - - Set nonKeyProperties = getNonKeyProperties(target); - List expressions = new ArrayList<>(nonKeyProperties.size() * 2); - nonKeyProperties.forEach(property -> { - expressions.add(Cypher.property(entityVariable, property)); - expressions.add(Cypher.property(rowVariable, sourceFieldFor(target, property))); - }); - return expressions; - } + EntityTargetStep target, SymbolicName entityVariable, SymbolicName rowVariable) { - private static Set getNonKeyProperties(EntityTarget nodeTarget) { - Set properties = - new HashSet<>(nodeTarget.getAllProperties().size()); - properties.addAll(nodeTarget.getAllProperties()); - properties.removeAll(new HashSet<>(nodeTarget.getKeyProperties())); - return properties; - } - - private static String sourceFieldFor(EntityTarget target, String property) { - var sourceField = target.getProperties().stream() - .filter(mapping -> mapping.getTargetProperty().equals(property)) - .map(PropertyMapping::getSourceField) - .findFirst(); - assertThat(sourceField).isPresent(); - return sourceField.get(); + return target.nonKeyProperties().stream() + .flatMap(mapping -> Stream.of( + Cypher.property(entityVariable, mapping.getTargetProperty()), + Cypher.property(rowVariable, mapping.getSourceField()))) + .toList(); } private List> parameters(Iterable records) { @@ -1104,12 +1027,6 @@ public String toString() { } } - private static T findTargetByName(List targets, String name) { - var results = targets.stream().filter(t -> t.getName().equals(name)).toList(); - assertThat(results).hasSize(1); - return results.getFirst(); - } - private static class CypherActionIO extends PTransform<@NonNull PCollection, @NonNull PCollection> { diff --git a/examples/neo4j-admin/src/test/java/org/neo4j/importer/Neo4jAdminExampleIT.java b/examples/neo4j-admin/src/test/java/org/neo4j/importer/Neo4jAdminExampleIT.java index 0b3bc01..2937460 100644 --- a/examples/neo4j-admin/src/test/java/org/neo4j/importer/Neo4jAdminExampleIT.java +++ b/examples/neo4j-admin/src/test/java/org/neo4j/importer/Neo4jAdminExampleIT.java @@ -24,6 +24,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; @@ -333,17 +334,22 @@ private void migrateTemporalProperties() { } public void copyFiles(ImportSpecification specification) throws Exception { + Map indexedSources = + specification.getSources().stream().collect(Collectors.toMap(Source::getName, Function.identity())); + Map indexedNodes = specification.getTargets().getNodes().stream() + .collect(Collectors.toMap(Target::getName, Function.identity())); for (Target target : specification.getTargets().getAll()) { switch (target) { - case NodeTarget nodeTarget -> copyFile(specification, nodeTarget); - case RelationshipTarget relationshipTarget -> copyFile(specification, relationshipTarget); + case NodeTarget nodeTarget -> copyFile(indexedSources, nodeTarget); + case RelationshipTarget relationshipTarget -> copyFile( + indexedSources, indexedNodes, relationshipTarget); default -> throw new RuntimeException("unsupported target type: %s".formatted(target.getClass())); } } } - private void copyFile(ImportSpecification specification, NodeTarget nodeTarget) throws Exception { - var source = specification.findSourceByName(nodeTarget.getSource()); + private void copyFile(Map sources, NodeTarget nodeTarget) throws Exception { + var source = sources.get(nodeTarget.getSource()); assertThat(source).isInstanceOf(ParquetSource.class); File parquetFile = new File(sharedFolder, fileName(nodeTarget)); List fields = readFieldNames(source); @@ -352,23 +358,16 @@ private void copyFile(ImportSpecification specification, NodeTarget nodeTarget) copyParquetSource((ParquetSource) source, parquetFile, fieldMappings); } - private void copyFile(ImportSpecification specification, RelationshipTarget relationshipTarget) + private void copyFile( + Map sources, Map nodes, RelationshipTarget relationshipTarget) throws Exception { File parquetFile = new File(sharedFolder, fileName(relationshipTarget)); - var source = specification.findSourceByName(relationshipTarget.getSource()); + var source = sources.get(relationshipTarget.getSource()); assertThat(source).isInstanceOf(ParquetSource.class); - var startNodeTarget = specification.getTargets().getNodes().stream() - .filter(t -> t.getName().equals(relationshipTarget.getStartNodeReference())) - .findFirst() - .orElseThrow(); - - var endNodeTarget = specification.getTargets().getNodes().stream() - .filter(t -> t.getName().equals(relationshipTarget.getEndNodeReference())) - .findFirst() - .orElseThrow(); - + var startNodeTarget = nodes.get(relationshipTarget.getStartNodeReference()); + var endNodeTarget = nodes.get(relationshipTarget.getEndNodeReference()); List fields = readFieldNames(source); Map fieldMappings = computeFieldMappings(fields, relationshipTarget, startNodeTarget, endNodeTarget); @@ -420,7 +419,7 @@ private static String[] importCommand(ImportSpecification specification, String private static Map computeFieldMappings(List fields, NodeTarget nodeTarget) { var mappings = indexByField(nodeTarget.getProperties()); - var keyProperties = new HashSet<>(nodeTarget.getKeyProperties()); + var keyProperties = new HashSet<>(getKeyProperties(nodeTarget)); for (String field : fields) { var property = mappings.get(field); @@ -443,19 +442,31 @@ private static Map computeFieldMappings( NodeTarget endNodeTarget) { var mappings = indexByField(relationshipTarget.getProperties()); + var startNodeKeys = getKeyProperties(startNodeTarget); startNodeTarget.getProperties().stream() .filter(m -> fields.contains(m.getSourceField())) - .filter(m -> startNodeTarget.getKeyProperties().contains(m.getTargetProperty())) + .filter(m -> startNodeKeys.contains(m.getTargetProperty())) .forEach(m -> mappings.put(m.getSourceField(), startIdSpaceFor(startNodeTarget))); + var endNodeKeys = getKeyProperties(endNodeTarget); endNodeTarget.getProperties().stream() .filter(m -> fields.contains(m.getSourceField())) - .filter(m -> endNodeTarget.getKeyProperties().contains(m.getTargetProperty())) + .filter(m -> endNodeKeys.contains(m.getTargetProperty())) .forEach(m -> mappings.put(m.getSourceField(), endIdSpaceFor(endNodeTarget))); return mappings; } + private static Set getKeyProperties(NodeTarget nodeTarget) { + var schema = nodeTarget.getSchema(); + if (schema == null) { + return Set.of(); + } + return schema.getKeyConstraints().stream() + .flatMap(constraint -> constraint.getProperties().stream()) + .collect(Collectors.toSet()); + } + // 🐤 private static List readFieldNames(Source source) throws Exception { try (var connection = DriverManager.getConnection("jdbc:duckdb:"); diff --git a/pom.xml b/pom.xml index d9569ab..dc65890 100644 --- a/pom.xml +++ b/pom.xml @@ -197,6 +197,26 @@ + + org.apache.maven.plugins + maven-compiler-plugin + + + compile-java-17 + + compile + + compile + + 17 + + ${project.basedir}/src/main/java17 + + true + + + + org.apache.maven.plugins maven-failsafe-plugin @@ -294,6 +314,17 @@ + + org.apache.maven.plugins + maven-jar-plugin + + + + true + + + + diff --git a/src/main/java/org/neo4j/importer/v1/ImportSpecification.java b/src/main/java/org/neo4j/importer/v1/ImportSpecification.java index b5c26d6..256469e 100644 --- a/src/main/java/org/neo4j/importer/v1/ImportSpecification.java +++ b/src/main/java/org/neo4j/importer/v1/ImportSpecification.java @@ -30,6 +30,7 @@ public class ImportSpecification implements Serializable { private final String version; + private final Configuration configuration; private final List sources; @@ -73,14 +74,6 @@ public List getActions() { return actions != null ? actions : Collections.emptyList(); } - public Source findSourceByName(String source) { - return sources.stream() - .filter((src) -> src.getName().equals(source)) - .findFirst() - .orElseThrow(() -> - new IllegalArgumentException(String.format("Could not find any source named %s", source))); - } - @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/src/main/java/org/neo4j/importer/v1/graph/Graph.java b/src/main/java/org/neo4j/importer/v1/graph/Graphs.java similarity index 97% rename from src/main/java/org/neo4j/importer/v1/graph/Graph.java rename to src/main/java/org/neo4j/importer/v1/graph/Graphs.java index b66fef5..0b67050 100644 --- a/src/main/java/org/neo4j/importer/v1/graph/Graph.java +++ b/src/main/java/org/neo4j/importer/v1/graph/Graphs.java @@ -29,7 +29,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -public class Graph { +public class Graphs { /** * Returns a topologically-sorted list from a given dependency graph @@ -73,10 +73,9 @@ public static List runTopologicalSort(Map> graph) { // K: depen /** * Detects whether the given dependency graph encoded as a map contains any cycles. - * The Map encodes dependent-to-dependencies mappings. - * - Map.of("a", Set.of("b", "c")) means item a depends on both item b and c - * - Map.of("b", Set.of()) means item b does not have any dependencies - * + * The Map encodes dependent-to-dependencies mappings.
+ * - Map.of("a", Set.of("b", "c")) means item a depends on both item b and c
+ * - Map.of("b", Set.of()) means item b does not have any dependencies
* @param graph the dependency graph * @param the element type * @return the list of paths with cycles, an empty list means there is no cycle diff --git a/src/main/java/org/neo4j/importer/v1/pipeline/ActionStep.java b/src/main/java/org/neo4j/importer/v1/pipeline/ActionStep.java new file mode 100644 index 0000000..f9f1dae --- /dev/null +++ b/src/main/java/org/neo4j/importer/v1/pipeline/ActionStep.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.importer.v1.pipeline; + +import java.util.List; +import java.util.Objects; +import org.neo4j.importer.v1.actions.Action; +import org.neo4j.importer.v1.actions.ActionStage; + +public class ActionStep implements ImportStep { + + private final Action action; + private final List dependencies; + + ActionStep(Action action, List dependencies) { + this.action = action; + this.dependencies = dependencies; + } + + @Override + public String name() { + return action.getName(); + } + + public ActionStage stage() { + return action().getStage(); + } + + public Action action() { + return action; + } + + @Override + public List dependencies() { + return dependencies; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ActionStep)) return false; + ActionStep that = (ActionStep) o; + return Objects.equals(action, that.action) && Objects.equals(dependencies, that.dependencies); + } + + @Override + public int hashCode() { + return Objects.hash(action, dependencies); + } +} diff --git a/src/main/java/org/neo4j/importer/v1/pipeline/CustomQueryTargetStep.java b/src/main/java/org/neo4j/importer/v1/pipeline/CustomQueryTargetStep.java new file mode 100644 index 0000000..924eabe --- /dev/null +++ b/src/main/java/org/neo4j/importer/v1/pipeline/CustomQueryTargetStep.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.importer.v1.pipeline; + +import java.util.List; +import java.util.Objects; +import org.neo4j.importer.v1.targets.CustomQueryTarget; + +public class CustomQueryTargetStep extends TargetStep { + + private final CustomQueryTarget target; + + CustomQueryTargetStep(CustomQueryTarget target, List dependencies) { + super(dependencies); + this.target = target; + } + + public String query() { + return target.getQuery(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof CustomQueryTargetStep)) return false; + if (!super.equals(o)) return false; + CustomQueryTargetStep that = (CustomQueryTargetStep) o; + return Objects.equals(target, that.target); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), target); + } + + @Override + protected CustomQueryTarget target() { + return target; + } +} diff --git a/src/main/java/org/neo4j/importer/v1/pipeline/EntityTargetStep.java b/src/main/java/org/neo4j/importer/v1/pipeline/EntityTargetStep.java new file mode 100644 index 0000000..ffc9c70 --- /dev/null +++ b/src/main/java/org/neo4j/importer/v1/pipeline/EntityTargetStep.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.importer.v1.pipeline; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.neo4j.importer.v1.targets.EntityTarget; +import org.neo4j.importer.v1.targets.PropertyMapping; +import org.neo4j.importer.v1.targets.PropertyType; + +public abstract class EntityTargetStep extends TargetStep { + + protected EntityTargetStep(List dependencies) { + super(dependencies); + } + + public Map propertyTypes() { + return target().getProperties().stream() + .filter(mapping -> mapping.getTargetPropertyType() != null) + .collect(Collectors.toMap(PropertyMapping::getTargetProperty, PropertyMapping::getTargetPropertyType)); + } + + public abstract List keyProperties(); + + public abstract List nonKeyProperties(); + + protected abstract EntityTarget target(); +} diff --git a/src/main/java/org/neo4j/importer/v1/pipeline/ImportPipeline.java b/src/main/java/org/neo4j/importer/v1/pipeline/ImportPipeline.java new file mode 100644 index 0000000..9d8eea2 --- /dev/null +++ b/src/main/java/org/neo4j/importer/v1/pipeline/ImportPipeline.java @@ -0,0 +1,331 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.importer.v1.pipeline; + +import java.io.Reader; +import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.neo4j.importer.v1.ImportSpecification; +import org.neo4j.importer.v1.actions.Action; +import org.neo4j.importer.v1.actions.ActionStage; +import org.neo4j.importer.v1.graph.Graphs; +import org.neo4j.importer.v1.sources.Source; +import org.neo4j.importer.v1.targets.CustomQueryTarget; +import org.neo4j.importer.v1.targets.NodeTarget; +import org.neo4j.importer.v1.targets.RelationshipTarget; +import org.neo4j.importer.v1.targets.Target; +import org.neo4j.importer.v1.targets.Targets; + +/** + * {@link ImportPipeline} exposes a topologically-ordered set of {@link ImportStep}, + * based on the provided {@link ImportSpecification}, usually created with + * {@link org.neo4j.importer.v1.ImportSpecificationDeserializer#deserialize(Reader)} or its variants. + *

+ *
+ *     var specification = org.neo4j.importer.v1.ImportSpecificationDeserializer.deserialize(aReader);
+ *     var pipeline = @link ImportPipeline.of(specification);
+ *     pipeline.forEach((step) -> {
+ *          if (step instanceof SourceStep) {
+ *              handleSource((SourceStep) step);
+ *          }
+ *          else if (step instanceof ActionStep) {
+ *              handleAction((ActionStep) step);
+ *          }
+ *          else if (step instanceof TargetStep) {
+ *              handleTarget((TargetStep) step);
+ *          }
+ *     });
+ * 
+ *

+ * Since an {@link ImportStep} may have dependencies, which are either:

+ * - implicit like a {@link TargetStep} depending on a {@link SourceStep}, + * a {@link RelationshipTargetStep} depending on start/end {@link NodeTargetStep}s
+ * - and/or explicit (via {@link ImportStep#dependencies()}

+ * ... the pipeline guarantees that dependencies are *always* returned after their dependents.
+ * In particular, the dependencies of each {@link ActionStep} are resolved at pipeline construction, based on the + * provided import specification and the corresponding {@link Action}'s {@link ActionStage}. + */ +public class ImportPipeline implements Iterable, Serializable { + + private final List tasks; + + public static ImportPipeline of(ImportSpecification importSpecification) { + var dependencyGraph = buildDependencyNameGraph(importSpecification); + var tasks = collectTasks(importSpecification, dependencyGraph); + return new ImportPipeline(tasks); + } + + private ImportPipeline(List tasks) { + this.tasks = tasks; + } + + @Override + public Iterator iterator() { + return tasks.iterator(); + } + + private static Map> buildDependencyNameGraph( + ImportSpecification importSpecification) { + Map> dependencyGraph = new HashMap<>(); + Targets targets = importSpecification.getTargets(); + var activeTargets = targets.getAll().stream().filter(Target::isActive).collect(Collectors.toList()); + var activeSources = + activeTargets.stream().map(Target::getSource).distinct().collect(Collectors.toList()); + var sources = new LinkedHashSet(activeSources.size()); + activeSources.forEach(source -> { + var qualifiedName = QualifiedName.ofSource(source); + dependencyGraph.put(qualifiedName, Set.of()); + sources.add(qualifiedName); + }); + Map> indexedActions = importSpecification.getActions().stream() + .filter(Action::isActive) + .collect(Collectors.groupingBy( + Action::getStage, + Collectors.mapping( + action -> QualifiedName.ofAction(action.getName()), + Collectors.toCollection(LinkedHashSet::new)))); + var indexedTargets = activeTargets.stream().collect(Collectors.toMap(Target::getName, Function.identity())); + var startActions = indexedActions.getOrDefault(ActionStage.START, Set.of()); + var preNodeActions = indexedActions.getOrDefault(ActionStage.PRE_NODES, Set.of()); + var nodeTargets = new LinkedHashSet(targets.getNodes().size()); + var preRelationshipActions = indexedActions.getOrDefault(ActionStage.PRE_RELATIONSHIPS, Set.of()); + var relationshipTargets = + new LinkedHashSet(targets.getRelationships().size()); + var preQueryActions = indexedActions.getOrDefault(ActionStage.PRE_QUERIES, Set.of()); + var queryTargets = + new LinkedHashSet(targets.getCustomQueries().size()); + activeTargets.forEach(target -> { + var targetName = target.getName(); + var dependencies = new HashSet(); + dependencies.add(QualifiedName.ofSource(target.getSource())); + dependencies.addAll(target.getDependencies().stream() + .map(name -> QualifiedName.ofTarget(indexedTargets.get(name))) + .collect(Collectors.toSet())); + dependencies.addAll(startActions); + if (target instanceof NodeTarget) { + dependencies.addAll(preNodeActions); + var qualifiedName = QualifiedName.ofNodeTarget(targetName); + dependencyGraph.put(qualifiedName, dependencies); + nodeTargets.add(qualifiedName); + } else if (target instanceof RelationshipTarget) { + dependencies.addAll(preRelationshipActions); + var relationshipTarget = (RelationshipTarget) target; + dependencies.add(QualifiedName.ofNodeTarget(relationshipTarget.getStartNodeReference())); + dependencies.add(QualifiedName.ofNodeTarget(relationshipTarget.getEndNodeReference())); + var qualifiedName = QualifiedName.ofRelationshipTarget(targetName); + dependencyGraph.put(qualifiedName, dependencies); + relationshipTargets.add(qualifiedName); + } else if (target instanceof CustomQueryTarget) { + dependencies.addAll(preQueryActions); + var qualifiedName = QualifiedName.ofQueryTarget(targetName); + dependencyGraph.put(qualifiedName, dependencies); + queryTargets.add(qualifiedName); + } else { + throw new RuntimeException( + String.format("Unknown type %s of target: %s", target.getClass(), targetName)); + } + }); + Map> dependenciesByStage = Map.of( + ActionStage.START, + Set.of(), + ActionStage.POST_SOURCES, + sources, + ActionStage.PRE_NODES, + Set.of(), + ActionStage.POST_NODES, + nodeTargets, + ActionStage.PRE_RELATIONSHIPS, + Set.of(), + ActionStage.POST_RELATIONSHIPS, + relationshipTargets, + ActionStage.PRE_QUERIES, + Set.of(), + ActionStage.POST_QUERIES, + queryTargets, + ActionStage.END, + concat( + sources, + nodeTargets, + relationshipTargets, + queryTargets, + indexedActions.getOrDefault(ActionStage.START, Set.of()), + indexedActions.getOrDefault(ActionStage.POST_SOURCES, Set.of()), + indexedActions.getOrDefault(ActionStage.PRE_NODES, Set.of()), + indexedActions.getOrDefault(ActionStage.POST_NODES, Set.of()), + indexedActions.getOrDefault(ActionStage.PRE_RELATIONSHIPS, Set.of()), + indexedActions.getOrDefault(ActionStage.POST_RELATIONSHIPS, Set.of()), + indexedActions.getOrDefault(ActionStage.PRE_QUERIES, Set.of()), + indexedActions.getOrDefault(ActionStage.POST_QUERIES, Set.of()))); + indexedActions.forEach((stage, actions) -> { + var dependencies = dependenciesByStage.get(stage); + actions.forEach(action -> dependencyGraph.put(action, dependencies)); + }); + return dependencyGraph; + } + + private static List collectTasks( + ImportSpecification importSpecification, Map> dependencyGraph) { + var indexedSources = importSpecification.getSources().stream() + .collect(Collectors.toMap(Source::getName, Function.identity())); + var indexedNodeTargets = importSpecification.getTargets().getNodes().stream() + .collect(Collectors.toMap(Target::getName, Function.identity())); + var indexedRelationshipTargets = importSpecification.getTargets().getRelationships().stream() + .collect(Collectors.toMap(Target::getName, Function.identity())); + var indexedQueryTargets = importSpecification.getTargets().getCustomQueries().stream() + .collect(Collectors.toMap(Target::getName, Function.identity())); + var indexedActions = importSpecification.getActions().stream() + .collect(Collectors.toMap(Action::getName, Function.identity())); + var processedNodeTasks = new HashMap(); + var processedTasks = new HashMap(); + return Graphs.runTopologicalSort(dependencyGraph).stream() + // if A depends on B, B is guaranteed to be iterated on/mapped before A + .map(qualifiedName -> { + String name = qualifiedName.getValue(); + var dependencyTasks = dependencyGraph.getOrDefault(qualifiedName, Set.of()).stream() + .map(processedTasks::get) + .collect(Collectors.toList()); + var nameType = qualifiedName.getType(); + switch (nameType) { + case SOURCE: + var sourceTask = new SourceStep(indexedSources.get(name)); + processedTasks.put(qualifiedName, sourceTask); + return sourceTask; + case NODE_TARGET: + var nodeTask = new NodeTargetStep(indexedNodeTargets.get(name), dependencyTasks); + processedTasks.put(qualifiedName, nodeTask); + processedNodeTasks.put(name, nodeTask); + return nodeTask; + case RELATIONSHIP_TARGET: + var relationshipTarget = indexedRelationshipTargets.get(name); + var relationshipTask = new RelationshipTargetStep( + relationshipTarget, + processedNodeTasks.get(relationshipTarget.getStartNodeReference()), + processedNodeTasks.get(relationshipTarget.getEndNodeReference()), + dependencyTasks); + processedTasks.put(qualifiedName, relationshipTask); + return relationshipTask; + case QUERY_TARGET: + var queryTarget = indexedQueryTargets.get(name); + var queryTask = new CustomQueryTargetStep(queryTarget, dependencyTasks); + processedTasks.put(qualifiedName, queryTask); + return queryTask; + case ACTION: + var action = indexedActions.get(name); + var actionTask = new ActionStep(action, dependencyTasks); + processedTasks.put(qualifiedName, actionTask); + return actionTask; + default: + throw new RuntimeException("Unknown import task type: " + nameType); + } + }) + .collect(Collectors.toList()); + } + + @SafeVarargs + private static Set concat(Set... sets) { + var result = new LinkedHashSet(); + for (Set set : sets) { + result.addAll(set); + } + return result; + } + + private static class QualifiedName { + private final ContainerType type; + private final String value; + + private QualifiedName(ContainerType type, String value) { + this.type = type; + this.value = value; + } + + public static QualifiedName ofSource(String name) { + return new QualifiedName(ContainerType.SOURCE, name); + } + + public static QualifiedName ofNodeTarget(String name) { + return new QualifiedName(ContainerType.NODE_TARGET, name); + } + + public static QualifiedName ofRelationshipTarget(String name) { + return new QualifiedName(ContainerType.RELATIONSHIP_TARGET, name); + } + + public static QualifiedName ofQueryTarget(String name) { + return new QualifiedName(ContainerType.QUERY_TARGET, name); + } + + public static QualifiedName ofAction(String name) { + return new QualifiedName(ContainerType.ACTION, name); + } + + public static QualifiedName ofTarget(Target target) { + return new QualifiedName(ContainerType.typeOf(target), target.getName()); + } + + public ContainerType getType() { + return type; + } + + public String getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof QualifiedName)) return false; + QualifiedName that = (QualifiedName) o; + return type == that.type && Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(type, value); + } + } + + private enum ContainerType { + SOURCE, + NODE_TARGET, + RELATIONSHIP_TARGET, + QUERY_TARGET, + ACTION; + + public static ContainerType typeOf(Target target) { + if (target instanceof NodeTarget) { + return NODE_TARGET; + } + if (target instanceof RelationshipTarget) { + return RELATIONSHIP_TARGET; + } + if (target instanceof CustomQueryTarget) { + return QUERY_TARGET; + } + throw new IllegalArgumentException( + String.format("Unexpected type %s of target %s ", target.getClass(), target.getName())); + } + } +} diff --git a/src/main/java/org/neo4j/importer/v1/pipeline/ImportStep.java b/src/main/java/org/neo4j/importer/v1/pipeline/ImportStep.java new file mode 100644 index 0000000..deadec0 --- /dev/null +++ b/src/main/java/org/neo4j/importer/v1/pipeline/ImportStep.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.importer.v1.pipeline; + +import java.io.Serializable; +import java.util.List; + +public interface ImportStep extends Serializable { + String name(); + + default List dependencies() { + return List.of(); + } +} diff --git a/src/main/java/org/neo4j/importer/v1/pipeline/NodeTargetStep.java b/src/main/java/org/neo4j/importer/v1/pipeline/NodeTargetStep.java new file mode 100644 index 0000000..28bc1fc --- /dev/null +++ b/src/main/java/org/neo4j/importer/v1/pipeline/NodeTargetStep.java @@ -0,0 +1,126 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.importer.v1.pipeline; + +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.neo4j.importer.v1.targets.NodeExistenceConstraint; +import org.neo4j.importer.v1.targets.NodeSchema; +import org.neo4j.importer.v1.targets.NodeTarget; +import org.neo4j.importer.v1.targets.NodeUniqueConstraint; +import org.neo4j.importer.v1.targets.PropertyMapping; +import org.neo4j.importer.v1.targets.WriteMode; + +public class NodeTargetStep extends EntityTargetStep { + + private final NodeTarget target; + + NodeTargetStep(NodeTarget target, List dependencies) { + super(dependencies); + this.target = target; + } + + public WriteMode writeMode() { + return target.getWriteMode(); + } + + public List labels() { + return target.getLabels(); + } + + @Override + public List keyProperties() { + var schema = schema(); + if (schema == null) { + return List.of(); + } + return List.copyOf(distinctKeyProperties(target.getProperties(), schema)); + } + + @Override + public List nonKeyProperties() { + var schema = schema(); + var mappings = target.getProperties(); + if (schema == null) { + return mappings; + } + var keyProperties = distinctKeyProperties(mappings, schema); + return mappings.stream() + .filter(mapping -> !keyProperties.contains(mapping)) + .collect(Collectors.toUnmodifiableList()); + } + + public NodeSchema schema() { + return target.getSchema(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof NodeTargetStep)) return false; + if (!super.equals(o)) return false; + NodeTargetStep task = (NodeTargetStep) o; + return Objects.equals(target, task.target); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), target); + } + + @Override + protected NodeTarget target() { + return target; + } + + private static Set distinctKeyProperties(List properties, NodeSchema schema) { + var mappings = indexByPropertyName(properties); + Set result = schema.getKeyConstraints().stream() + .flatMap(constraint -> constraint.getProperties().stream()) + .map(mappings::get) + .collect(Collectors.toCollection(LinkedHashSet::new)); + result.addAll( + keyEquivalentProperties(schema.getUniqueConstraints(), schema.getExistenceConstraints(), mappings)); + return result; + } + + private static Map indexByPropertyName(List mappings) { + return mappings.stream().collect(Collectors.toMap(PropertyMapping::getTargetProperty, Function.identity())); + } + + private static Set keyEquivalentProperties( + List uniqueConstraints, + List existenceConstraints, + Map mappings) { + + Set result = + new LinkedHashSet<>(Math.min(uniqueConstraints.size(), existenceConstraints.size())); + Set uniqueProperties = uniqueConstraints.stream() + .flatMap(constraint -> constraint.getProperties().stream()) + .map(mappings::get) + .collect(Collectors.toCollection(LinkedHashSet::new)); + result.addAll(existenceConstraints.stream() + .map(constraint -> mappings.get(constraint.getProperty())) + .filter(uniqueProperties::contains) + .collect(Collectors.toList())); + return result; + } +} diff --git a/src/main/java/org/neo4j/importer/v1/pipeline/RelationshipTargetStep.java b/src/main/java/org/neo4j/importer/v1/pipeline/RelationshipTargetStep.java new file mode 100644 index 0000000..c97501a --- /dev/null +++ b/src/main/java/org/neo4j/importer/v1/pipeline/RelationshipTargetStep.java @@ -0,0 +1,150 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.importer.v1.pipeline; + +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.neo4j.importer.v1.targets.NodeMatchMode; +import org.neo4j.importer.v1.targets.PropertyMapping; +import org.neo4j.importer.v1.targets.RelationshipExistenceConstraint; +import org.neo4j.importer.v1.targets.RelationshipSchema; +import org.neo4j.importer.v1.targets.RelationshipTarget; +import org.neo4j.importer.v1.targets.RelationshipUniqueConstraint; +import org.neo4j.importer.v1.targets.WriteMode; + +public class RelationshipTargetStep extends EntityTargetStep { + + private final RelationshipTarget target; + private final NodeTargetStep startNode; + private final NodeTargetStep endNode; + + RelationshipTargetStep( + RelationshipTarget target, + NodeTargetStep startNode, + NodeTargetStep endNode, + List dependencies) { + super(dependencies); + this.target = target; + this.startNode = startNode; + this.endNode = endNode; + } + + public WriteMode writeMode() { + return target.getWriteMode(); + } + + public NodeMatchMode nodeMatchMode() { + return target.getNodeMatchMode(); + } + + public String type() { + return target.getType(); + } + + @Override + public List keyProperties() { + var schema = target.getSchema(); + if (schema == null) { + return List.of(); + } + return List.copyOf(distinctKeyProperties(target.getProperties(), schema)); + } + + @Override + public List nonKeyProperties() { + var schema = target.getSchema(); + var mappings = target.getProperties(); + if (schema == null) { + return mappings; + } + var keyProperties = distinctKeyProperties(mappings, schema); + return mappings.stream() + .filter(mapping -> !keyProperties.contains(mapping)) + .collect(Collectors.toUnmodifiableList()); + } + + public NodeTargetStep startNode() { + return startNode; + } + + public NodeTargetStep endNode() { + return endNode; + } + + public RelationshipSchema schema() { + return target.getSchema(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof RelationshipTargetStep)) return false; + if (!super.equals(o)) return false; + RelationshipTargetStep that = (RelationshipTargetStep) o; + return Objects.equals(target, that.target) + && Objects.equals(startNode, that.startNode) + && Objects.equals(endNode, that.endNode); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), target, startNode, endNode); + } + + @Override + protected RelationshipTarget target() { + return target; + } + + private static Set distinctKeyProperties( + List properties, RelationshipSchema schema) { + var mappings = indexByPropertyName(properties); + Set result = schema.getKeyConstraints().stream() + .flatMap(constraint -> constraint.getProperties().stream()) + .map(mappings::get) + .collect(Collectors.toCollection(LinkedHashSet::new)); + result.addAll( + keyEquivalentProperties(schema.getUniqueConstraints(), schema.getExistenceConstraints(), mappings)); + return result; + } + + private static Map indexByPropertyName(List mappings) { + return mappings.stream().collect(Collectors.toMap(PropertyMapping::getTargetProperty, Function.identity())); + } + + private static Set keyEquivalentProperties( + List uniqueConstraints, + List existenceConstraints, + Map mappings) { + + Set result = + new LinkedHashSet<>(Math.min(uniqueConstraints.size(), existenceConstraints.size())); + Set uniqueProperties = uniqueConstraints.stream() + .flatMap(constraint -> constraint.getProperties().stream()) + .map(mappings::get) + .collect(Collectors.toCollection(LinkedHashSet::new)); + result.addAll(existenceConstraints.stream() + .map(constraint -> mappings.get(constraint.getProperty())) + .filter(uniqueProperties::contains) + .collect(Collectors.toList())); + return result; + } +} diff --git a/src/main/java/org/neo4j/importer/v1/validation/plugin/NoDuplicatedSourceNameValidator.java b/src/main/java/org/neo4j/importer/v1/pipeline/SourceStep.java similarity index 50% rename from src/main/java/org/neo4j/importer/v1/validation/plugin/NoDuplicatedSourceNameValidator.java rename to src/main/java/org/neo4j/importer/v1/pipeline/SourceStep.java index 94687b2..3378b7e 100644 --- a/src/main/java/org/neo4j/importer/v1/validation/plugin/NoDuplicatedSourceNameValidator.java +++ b/src/main/java/org/neo4j/importer/v1/pipeline/SourceStep.java @@ -14,28 +14,37 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.neo4j.importer.v1.validation.plugin; +package org.neo4j.importer.v1.pipeline; +import java.util.Objects; import org.neo4j.importer.v1.sources.Source; -import org.neo4j.importer.v1.validation.SpecificationValidationResult.Builder; -import org.neo4j.importer.v1.validation.SpecificationValidator; -public class NoDuplicatedSourceNameValidator implements SpecificationValidator { - private static final String ERROR_CODE = "DUPL-004"; +public class SourceStep implements ImportStep { - private final NameCounter nameCounter; + private final Source source; - public NoDuplicatedSourceNameValidator() { - nameCounter = new NameCounter(ERROR_CODE); + public SourceStep(Source source) { + this.source = source; } @Override - public void visitSource(int index, Source source) { - nameCounter.track(source.getName(), String.format("$.sources[%d]", index)); + public String name() { + return source.getName(); + } + + public Source source() { + return source; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof SourceStep)) return false; + SourceStep that = (SourceStep) o; + return Objects.equals(source, that.source); } @Override - public boolean report(Builder builder) { - return nameCounter.reportErrorsIfAny(builder); + public int hashCode() { + return Objects.hashCode(source); } } diff --git a/src/main/java/org/neo4j/importer/v1/pipeline/TargetStep.java b/src/main/java/org/neo4j/importer/v1/pipeline/TargetStep.java new file mode 100644 index 0000000..7db58ee --- /dev/null +++ b/src/main/java/org/neo4j/importer/v1/pipeline/TargetStep.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.importer.v1.pipeline; + +import java.util.List; +import java.util.Objects; +import org.neo4j.importer.v1.targets.Target; + +public abstract class TargetStep implements ImportStep { + + private final List dependencies; + + protected TargetStep(List dependencies) { + this.dependencies = dependencies; + } + + @Override + public String name() { + return target().getName(); + } + + public String sourceName() { + return target().getSource(); + } + + public List dependencies() { + return dependencies; + } + + protected abstract Target target(); + + @Override + public boolean equals(Object o) { + if (!(o instanceof TargetStep)) return false; + TargetStep that = (TargetStep) o; + return Objects.equals(dependencies, that.dependencies); + } + + @Override + public int hashCode() { + return Objects.hashCode(dependencies); + } +} diff --git a/src/main/java/org/neo4j/importer/v1/targets/EntityTarget.java b/src/main/java/org/neo4j/importer/v1/targets/EntityTarget.java index 02abd89..77e4bda 100644 --- a/src/main/java/org/neo4j/importer/v1/targets/EntityTarget.java +++ b/src/main/java/org/neo4j/importer/v1/targets/EntityTarget.java @@ -16,10 +16,8 @@ */ package org.neo4j.importer.v1.targets; -import java.util.Collections; import java.util.List; import java.util.Objects; -import java.util.stream.Collectors; public abstract class EntityTarget extends Target { private final WriteMode writeMode; @@ -49,24 +47,10 @@ public SourceTransformations getSourceTransformations() { return sourceTransformations; } - public List getAllProperties() { - return getProperties().stream().map(PropertyMapping::getTargetProperty).collect(Collectors.toList()); - } - public List getProperties() { - // properties can be null for relationship targets - return properties != null ? properties : Collections.emptyList(); + return properties; } - /** - * getKeyProperties returns the list of properties part of key constraints, or part of both unique and existence - * constraints. - * These are typically used when defining a node/relationship MERGE pattern. - * Call {@link NodeTarget#getSchema()} or {@link RelationshipTarget#getSchema()} to get properties that are only - * part of key constraints. - */ - public abstract List getKeyProperties(); - @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/src/main/java/org/neo4j/importer/v1/targets/NodeTarget.java b/src/main/java/org/neo4j/importer/v1/targets/NodeTarget.java index 2841242..0631cf5 100644 --- a/src/main/java/org/neo4j/importer/v1/targets/NodeTarget.java +++ b/src/main/java/org/neo4j/importer/v1/targets/NodeTarget.java @@ -18,13 +18,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import java.util.ArrayList; -import java.util.LinkedHashSet; import java.util.List; import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; public class NodeTarget extends EntityTarget { private final List labels; @@ -55,18 +50,6 @@ public NodeSchema getSchema() { return schema; } - @Override - public List getKeyProperties() { - if (schema == null) { - return new ArrayList<>(0); - } - Set result = schema.getKeyConstraints().stream() - .flatMap(NodeTarget::propertyStream) - .collect(Collectors.toCollection(LinkedHashSet::new)); - result.addAll(keyEquivalentProperties()); - return new ArrayList<>(result); - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -85,30 +68,4 @@ public int hashCode() { public String toString() { return "NodeTarget{" + "labels=" + labels + ", schema=" + schema + "} " + super.toString(); } - - private Set keyEquivalentProperties() { - var uniqueConstraints = schema.getUniqueConstraints(); - var existenceConstraints = schema.getExistenceConstraints(); - - Set result = new LinkedHashSet<>(Math.min(uniqueConstraints.size(), existenceConstraints.size())); - Set uniqueProperties = - uniqueConstraints.stream().flatMap(NodeTarget::propertyStream).collect(Collectors.toSet()); - result.addAll(existenceConstraints.stream() - .map(NodeExistenceConstraint::getProperty) - .filter(uniqueProperties::contains) - .collect(Collectors.toList())); - return result; - } - - private static Stream propertyStream(NodeKeyConstraint constraint) { - return propertyStream(constraint.getProperties()); - } - - private static Stream propertyStream(NodeUniqueConstraint constraint) { - return propertyStream(constraint.getProperties()); - } - - private static Stream propertyStream(List constraints) { - return constraints.stream(); - } } diff --git a/src/main/java/org/neo4j/importer/v1/targets/RelationshipTarget.java b/src/main/java/org/neo4j/importer/v1/targets/RelationshipTarget.java index 0644ffa..4479661 100644 --- a/src/main/java/org/neo4j/importer/v1/targets/RelationshipTarget.java +++ b/src/main/java/org/neo4j/importer/v1/targets/RelationshipTarget.java @@ -18,13 +18,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import java.util.ArrayList; -import java.util.LinkedHashSet; +import java.util.Collections; import java.util.List; import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; public class RelationshipTarget extends EntityTarget { private final String type; @@ -56,7 +52,7 @@ public RelationshipTarget( dependencies, writeMode, sourceTransformations, - properties); + properties != null ? properties : Collections.emptyList()); this.type = type; this.nodeMatchMode = nodeMatchMode; this.startNodeReference = startNodeReference; @@ -84,30 +80,6 @@ public String getEndNodeReference() { return endNodeReference; } - @Override - public boolean dependsOn(Target target) { - if (super.dependsOn(target)) { - return true; - } - if (!(target instanceof NodeTarget)) { - return false; - } - String nodeTargetName = target.getName(); - return nodeTargetName.equals(startNodeReference) || nodeTargetName.equals(endNodeReference); - } - - @Override - public List getKeyProperties() { - if (schema == null) { - return new ArrayList<>(0); - } - Set result = schema.getKeyConstraints().stream() - .flatMap(RelationshipTarget::propertyStream) - .collect(Collectors.toCollection(LinkedHashSet::new)); - result.addAll(keyEquivalentProperties()); - return new ArrayList<>(result); - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -136,31 +108,4 @@ public String toString() { + schema + "} " + super.toString(); } - - private Set keyEquivalentProperties() { - var uniqueConstraints = schema.getUniqueConstraints(); - var existenceConstraints = schema.getExistenceConstraints(); - - Set result = new LinkedHashSet<>(Math.min(uniqueConstraints.size(), existenceConstraints.size())); - Set uniqueProperties = uniqueConstraints.stream() - .flatMap(RelationshipTarget::propertyStream) - .collect(Collectors.toSet()); - result.addAll(existenceConstraints.stream() - .map(RelationshipExistenceConstraint::getProperty) - .filter(uniqueProperties::contains) - .collect(Collectors.toList())); - return result; - } - - private static Stream propertyStream(RelationshipKeyConstraint constraint) { - return propertyStream(constraint.getProperties()); - } - - private static Stream propertyStream(RelationshipUniqueConstraint constraint) { - return propertyStream(constraint.getProperties()); - } - - private static Stream propertyStream(List constraints) { - return constraints.stream(); - } } diff --git a/src/main/java/org/neo4j/importer/v1/targets/Target.java b/src/main/java/org/neo4j/importer/v1/targets/Target.java index e52be4e..1e6f0e0 100644 --- a/src/main/java/org/neo4j/importer/v1/targets/Target.java +++ b/src/main/java/org/neo4j/importer/v1/targets/Target.java @@ -21,7 +21,7 @@ import java.util.List; import java.util.Objects; -public abstract class Target implements Comparable, Serializable { +public abstract class Target implements Serializable { protected static final String DEFAULT_ACTIVE = "true"; private final TargetType targetType; @@ -58,17 +58,6 @@ public List getDependencies() { return dependencies; } - @Override - public int compareTo(Target other) { - if (other.dependsOn(this)) { - return -1; - } - if (this.dependsOn(other)) { - return 1; - } - return this.getName().compareTo(other.getName()); - } - @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/src/main/java/org/neo4j/importer/v1/validation/SpecificationValidators.java b/src/main/java/org/neo4j/importer/v1/validation/SpecificationValidators.java index c7f354e..0d1cbd4 100644 --- a/src/main/java/org/neo4j/importer/v1/validation/SpecificationValidators.java +++ b/src/main/java/org/neo4j/importer/v1/validation/SpecificationValidators.java @@ -29,7 +29,7 @@ import java.util.stream.Collectors; import org.neo4j.importer.v1.ImportSpecification; import org.neo4j.importer.v1.actions.Action; -import org.neo4j.importer.v1.graph.Graph; +import org.neo4j.importer.v1.graph.Graphs; public class SpecificationValidators { @@ -110,7 +110,7 @@ private static List runTopogicalSort(List> requires() { return Set.of( - NoDuplicatedTargetActionNameValidator.class, + NoDuplicatedNameValidator.class, NoDanglingDependsOnValidator.class, NoDanglingNodeReferenceValidator.class, NoDuplicatedDependencyValidator.class); @@ -74,7 +74,7 @@ public void visitCustomQueryTarget(int index, CustomQueryTarget target) { @Override public boolean report(Builder builder) { AtomicBoolean result = new AtomicBoolean(false); - Graph.detectCycles(dependencyGraph()).stream() + Graphs.detectCycles(dependencyGraph()).stream() .map(cycle -> { Element cycleStart = cycle.get(0); String cycleDescription = cycle.stream() diff --git a/src/main/java/org/neo4j/importer/v1/validation/plugin/NoDuplicatedTargetActionNameValidator.java b/src/main/java/org/neo4j/importer/v1/validation/plugin/NoDuplicatedNameValidator.java similarity index 91% rename from src/main/java/org/neo4j/importer/v1/validation/plugin/NoDuplicatedTargetActionNameValidator.java rename to src/main/java/org/neo4j/importer/v1/validation/plugin/NoDuplicatedNameValidator.java index f72b9ed..e3ba2e2 100644 --- a/src/main/java/org/neo4j/importer/v1/validation/plugin/NoDuplicatedTargetActionNameValidator.java +++ b/src/main/java/org/neo4j/importer/v1/validation/plugin/NoDuplicatedNameValidator.java @@ -22,21 +22,27 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.importer.v1.actions.Action; +import org.neo4j.importer.v1.sources.Source; import org.neo4j.importer.v1.targets.CustomQueryTarget; import org.neo4j.importer.v1.targets.NodeTarget; import org.neo4j.importer.v1.targets.RelationshipTarget; import org.neo4j.importer.v1.validation.SpecificationValidationResult.Builder; import org.neo4j.importer.v1.validation.SpecificationValidator; -public class NoDuplicatedTargetActionNameValidator implements SpecificationValidator { +public class NoDuplicatedNameValidator implements SpecificationValidator { private static final String ERROR_CODE = "DUPL-001"; private final NameCounter nameCounter; - public NoDuplicatedTargetActionNameValidator() { + public NoDuplicatedNameValidator() { nameCounter = new NameCounter(ERROR_CODE); } + @Override + public void visitSource(int index, Source source) { + nameCounter.track(source.getName(), String.format("$.sources[%d]", index)); + } + @Override public void visitNodeTarget(int index, NodeTarget target) { nameCounter.track(target.getName(), String.format("$.targets.nodes[%d]", index)); diff --git a/src/main/java17/org/neo4j/importer/v1/pipeline/EntityTargetStep.java b/src/main/java17/org/neo4j/importer/v1/pipeline/EntityTargetStep.java new file mode 100644 index 0000000..be11e0a --- /dev/null +++ b/src/main/java17/org/neo4j/importer/v1/pipeline/EntityTargetStep.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.importer.v1.pipeline; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.neo4j.importer.v1.targets.EntityTarget; +import org.neo4j.importer.v1.targets.PropertyMapping; +import org.neo4j.importer.v1.targets.PropertyType; + +public abstract sealed class EntityTargetStep extends TargetStep permits NodeTargetStep, RelationshipTargetStep { + + protected EntityTargetStep(List dependencies) { + super(dependencies); + } + + public Map propertyTypes() { + return target().getProperties().stream() + .filter(mapping -> mapping.getTargetPropertyType() != null) + .collect(Collectors.toMap(PropertyMapping::getTargetProperty, PropertyMapping::getTargetPropertyType)); + } + + public abstract List keyProperties(); + + public abstract List nonKeyProperties(); + + protected abstract EntityTarget target(); +} diff --git a/src/main/java17/org/neo4j/importer/v1/pipeline/ImportStep.java b/src/main/java17/org/neo4j/importer/v1/pipeline/ImportStep.java new file mode 100644 index 0000000..d548823 --- /dev/null +++ b/src/main/java17/org/neo4j/importer/v1/pipeline/ImportStep.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.importer.v1.pipeline; + +import java.io.Serializable; +import java.util.List; + +public sealed interface ImportStep extends Serializable permits SourceStep, ActionStep, TargetStep { + String name(); + + default List dependencies() { + return List.of(); + } +} diff --git a/src/main/java17/org/neo4j/importer/v1/pipeline/TargetStep.java b/src/main/java17/org/neo4j/importer/v1/pipeline/TargetStep.java new file mode 100644 index 0000000..51e2857 --- /dev/null +++ b/src/main/java17/org/neo4j/importer/v1/pipeline/TargetStep.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.importer.v1.pipeline; + +import java.util.List; +import java.util.Objects; +import org.neo4j.importer.v1.targets.Target; + +public abstract sealed class TargetStep implements ImportStep permits EntityTargetStep, CustomQueryTargetStep { + + private final List dependencies; + + protected TargetStep(List dependencies) { + this.dependencies = dependencies; + } + + @Override + public String name() { + return target().getName(); + } + + public String sourceName() { + return target().getSource(); + } + + public List dependencies() { + return dependencies; + } + + protected abstract Target target(); + + @Override + public boolean equals(Object o) { + if (!(o instanceof TargetStep)) return false; + TargetStep that = (TargetStep) o; + return Objects.equals(dependencies, that.dependencies); + } + + @Override + public int hashCode() { + return Objects.hashCode(dependencies); + } +} diff --git a/src/main/resources/META-INF/services/org.neo4j.importer.v1.validation.SpecificationValidator b/src/main/resources/META-INF/services/org.neo4j.importer.v1.validation.SpecificationValidator index 6a8e41d..1e23003 100644 --- a/src/main/resources/META-INF/services/org.neo4j.importer.v1.validation.SpecificationValidator +++ b/src/main/resources/META-INF/services/org.neo4j.importer.v1.validation.SpecificationValidator @@ -27,8 +27,7 @@ org.neo4j.importer.v1.validation.plugin.NoDuplicatedAggregateFieldNameValidator org.neo4j.importer.v1.validation.plugin.NoDuplicatedDependencyValidator org.neo4j.importer.v1.validation.plugin.NoDuplicatedLabelInFullTextIndexValidator org.neo4j.importer.v1.validation.plugin.NoDuplicatedLabelValidator -org.neo4j.importer.v1.validation.plugin.NoDuplicatedSourceNameValidator -org.neo4j.importer.v1.validation.plugin.NoDuplicatedTargetActionNameValidator +org.neo4j.importer.v1.validation.plugin.NoDuplicatedNameValidator org.neo4j.importer.v1.validation.plugin.NoDuplicatedTargetPropertyValidator org.neo4j.importer.v1.validation.plugin.NoRedundantKeyAndExistenceConstraintsValidator org.neo4j.importer.v1.validation.plugin.NoRedundantKeyAndUniqueConstraintsValidator diff --git a/src/test/java/org/neo4j/importer/v1/ImportSpecificationDeserializerExtraValidationTest.java b/src/test/java/org/neo4j/importer/v1/ImportSpecificationDeserializerExtraValidationTest.java index 6552e1b..1a37eed 100644 --- a/src/test/java/org/neo4j/importer/v1/ImportSpecificationDeserializerExtraValidationTest.java +++ b/src/test/java/org/neo4j/importer/v1/ImportSpecificationDeserializerExtraValidationTest.java @@ -67,21 +67,20 @@ void fails_if_source_name_is_duplicated() { } @Test - void does_not_fails_if_source_name_is_duplicated_with_target_name() { - assertThatNoException() - .isThrownBy(() -> deserialize(new StringReader( + void fails_if_source_name_is_duplicated_with_target_name() { + assertThatThrownBy(() -> deserialize(new StringReader( """ { "version": "1", "sources": [{ "type": "bigquery", - "name": "not-duplicate", + "name": "duplicate", "query": "SELECT id, name FROM my.table" }], "targets": { "nodes": [{ - "name": "not-duplicate", - "source": "not-duplicate", + "name": "duplicate", + "source": "duplicate", "labels": ["Label1", "Label2"], "write_mode": "create", "properties": [ @@ -92,25 +91,29 @@ void does_not_fails_if_source_name_is_duplicated_with_target_name() { } } """ - .stripIndent()))); + .stripIndent()))) + .isInstanceOf(InvalidSpecificationException.class) + .hasMessageContainingAll( + "1 error(s)", + "0 warning(s)", + "Name \"duplicate\" is duplicated across the following paths: $.sources[0].name, $.targets.nodes[0].name"); } @Test - void does_not_fails_if_source_name_is_duplicated_with_action_name() { - assertThatNoException() - .isThrownBy(() -> deserialize(new StringReader( + void fails_if_source_name_is_duplicated_with_action_name() { + assertThatThrownBy(() -> deserialize(new StringReader( """ { "version": "1", "sources": [{ "type": "bigquery", - "name": "not-duplicate", + "name": "duplicate", "query": "SELECT id, name FROM my.table" }], "targets": { "nodes": [{ "name": "target", - "source": "not-duplicate", + "source": "duplicate", "labels": ["Label1", "Label2"], "write_mode": "create", "properties": [ @@ -120,14 +123,19 @@ void does_not_fails_if_source_name_is_duplicated_with_action_name() { }] }, "actions": [{ - "name": "not-duplicate", + "name": "duplicate", "type": "cypher", "stage": "pre_relationships", "query": "CREATE (:PreRel)" }] } """ - .stripIndent()))); + .stripIndent()))) + .isInstanceOf(InvalidSpecificationException.class) + .hasMessageContainingAll( + "1 error(s)", + "0 warning(s)", + "Name \"duplicate\" is duplicated across the following paths: $.sources[0].name, $.actions[0].name"); } @Test diff --git a/src/test/java/org/neo4j/importer/v1/e2e/BeamIT.java b/src/test/java/org/neo4j/importer/v1/e2e/BeamIT.java index 88366da..6355b88 100644 --- a/src/test/java/org/neo4j/importer/v1/e2e/BeamIT.java +++ b/src/test/java/org/neo4j/importer/v1/e2e/BeamIT.java @@ -21,18 +21,11 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.EnumSet; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.jdbc.JdbcIO; @@ -60,21 +53,21 @@ import org.neo4j.driver.Record; import org.neo4j.driver.Session; import org.neo4j.driver.SessionConfig; -import org.neo4j.importer.v1.ImportSpecification; import org.neo4j.importer.v1.ImportSpecificationDeserializer; -import org.neo4j.importer.v1.actions.Action; -import org.neo4j.importer.v1.actions.ActionStage; import org.neo4j.importer.v1.actions.plugin.CypherAction; import org.neo4j.importer.v1.e2e.AdminImportIT.ThrowingFunction; -import org.neo4j.importer.v1.graph.Maps; +import org.neo4j.importer.v1.pipeline.ActionStep; +import org.neo4j.importer.v1.pipeline.CustomQueryTargetStep; +import org.neo4j.importer.v1.pipeline.ImportPipeline; +import org.neo4j.importer.v1.pipeline.ImportStep; +import org.neo4j.importer.v1.pipeline.NodeTargetStep; +import org.neo4j.importer.v1.pipeline.RelationshipTargetStep; +import org.neo4j.importer.v1.pipeline.SourceStep; +import org.neo4j.importer.v1.pipeline.TargetStep; import org.neo4j.importer.v1.sources.JdbcSource; import org.neo4j.importer.v1.sources.Source; -import org.neo4j.importer.v1.targets.CustomQueryTarget; -import org.neo4j.importer.v1.targets.NodeKeyConstraint; -import org.neo4j.importer.v1.targets.NodeTarget; import org.neo4j.importer.v1.targets.PropertyMapping; import org.neo4j.importer.v1.targets.RelationshipTarget; -import org.neo4j.importer.v1.targets.Target; import org.testcontainers.containers.Neo4jContainer; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.DockerImageName; @@ -126,61 +119,59 @@ private void runBeamImport(String extension) throws Exception { String neo4jPassword = NEO4J.getAdminPassword(); var importSpec = read("/e2e/beam-import/spec.%s".formatted(extension), ImportSpecificationDeserializer::deserialize); - Map> sourceOutputs = + var importPipeline = ImportPipeline.of(importSpec); + + Map> outputs = new HashMap<>(); + Map> sourceOutputs = new HashMap<>(importSpec.getSources().size()); - Map> targetOutputs = new HashMap<>(); - - Map>> preActions = Maps.mapValues( - actionsByStage( - importSpec, ActionStage.PRE_NODES, ActionStage.PRE_RELATIONSHIPS, ActionStage.PRE_QUERIES), - (actions) -> actions.stream() - .map(action -> { - assertThat(action).isInstanceOf(CypherAction.class); - return applyPreAction( - action, - CypherActionFn.of( - (CypherAction) action, NEO4J.getBoltUrl(), NEO4J.getAdminPassword())); - }) - .collect(Collectors.toList())); - - Set targets = new TreeSet<>(importSpec.getTargets().getAll()); - targets.forEach((target) -> { - String targetName = target.getName(); - Source source = importSpec.findSourceByName(target.getSource()); - PCollection sourceStart = sourceOutputs.computeIfAbsent( - source, - (ignored) -> pipeline.apply( - "Read rows from source %s".formatted(source.getName()), + importPipeline.forEach(importStep -> { + switch (importStep) { + case SourceStep step -> { + var name = step.name(); + PCollection output = pipeline.apply( + "[Source %s] Read rows".formatted(name), SourceIO.readAll( - source, POSTGRES.getJdbcUrl(), POSTGRES.getUsername(), POSTGRES.getPassword()))); - PCollection targetOutput = sourceStart - .apply( - "Wait on %s dependencies".formatted(targetName), - Wait.on(dependenciesOf(target, targetOutputs, preActions))) - .setCoder(SchemaCoder.of(sourceStart.getSchema())) - .apply( - "Perform %s import to Neo4j".formatted(targetName), - TargetIO.writeAll(neo4jUrl, neo4jPassword, importSpec, target)); - targetOutputs.put(targetName, targetOutput); + step.source(), + POSTGRES.getJdbcUrl(), + POSTGRES.getUsername(), + POSTGRES.getPassword())); + sourceOutputs.put(name, output); + outputs.put(name, output); + } + case TargetStep step -> { + var name = step.name(); + PCollection targetOutput = sourceOutputs + .get(step.sourceName()) + .apply( + "[Target %s] Wait for dependencies".formatted(name), + Wait.on(dependencyOutputs(step.dependencies(), outputs))) + .setCoder(SchemaCoder.of( + sourceOutputs.get(step.sourceName()).getSchema())) + .apply( + "[Target %s] Perform import to Neo4j".formatted(name), + TargetIO.writeAll(neo4jUrl, neo4jPassword, step)); + outputs.put(name, targetOutput); + } + case ActionStep step -> { + var name = step.name(); + var action = step.action(); + assertThat(action).isInstanceOf(CypherAction.class); + var cypherAction = (CypherAction) action; + PCollection output = pipeline.apply( + "[Action %s] Create single input".formatted(name), Create.of(1)) + .apply( + "[Action %s] Wait for dependencies".formatted(name), + Wait.on(dependencyOutputs(step.dependencies(), outputs))) + .setCoder(VarIntCoder.of()) + .apply( + "[action %s] Run".formatted(name), + ParDo.of(CypherActionFn.of(cypherAction, neo4jUrl, neo4jPassword))); + outputs.put(name, output); + } + default -> Assertions.fail("Unsupported step type: %s".formatted(importStep.getClass())); + } }); - actionsByStage( - importSpec, - ActionStage.POST_SOURCES, - ActionStage.POST_NODES, - ActionStage.POST_RELATIONSHIPS, - ActionStage.POST_QUERIES, - ActionStage.END) - .forEach((stage, actions) -> { - List> stageDependencies = - postActionDependencies(stage, importSpec, sourceOutputs, targetOutputs); - actions.forEach(action -> { - assertThat(action).isInstanceOf(CypherAction.class); - CypherActionFn doFn = CypherActionFn.of((CypherAction) action, neo4jUrl, neo4jPassword); - applyPostAction(action, stageDependencies, doFn); - }); - }); - pipeline.run(); var productCount = neo4jDriver @@ -241,89 +232,9 @@ private void runBeamImport(String extension) throws Exception { + 8L); // 77 (:Product) + 8 (:Category) + 77 (:ClonedProduct) + 8 (:ClonedCategory) } - private static Map> actionsByStage( - ImportSpecification importSpec, ActionStage... stages) { - var allStages = EnumSet.copyOf(Arrays.asList(stages)); - return importSpec.getActions().stream() - .filter(action -> allStages.contains(action.getStage())) - .collect(Collectors.groupingBy(Action::getStage)); - } - - private PCollection applyPreAction(Action action, CypherActionFn fn) { - return pipeline.apply("Create synthetic single input for action %s".formatted(action.getName()), Create.of(1)) - .setCoder(VarIntCoder.of()) - .apply("Run action %s".formatted(action.getName()), ParDo.of(fn)); - } - - private void applyPostAction(Action action, List> stageDependencies, CypherActionFn fn) { - pipeline.apply("Create synthetic single input for action %s".formatted(action.getName()), Create.of(1)) - .apply("Wait for dependencies of stage %s".formatted(action.getStage()), Wait.on(stageDependencies)) - .setCoder(VarIntCoder.of()) - .apply("Run action %s".formatted(action.getName()), ParDo.of(fn)); - } - - private List> postActionDependencies( - ActionStage stage, - ImportSpecification importSpec, - Map> sourceOutputs, - Map> targetOutputs) { - var targets = importSpec.getTargets(); - switch (stage) { - case POST_SOURCES -> { - return new ArrayList<>(sourceOutputs.values()); - } - case POST_NODES -> { - return resolveTargetOutputs(targets.getNodes(), targetOutputs); - } - case POST_RELATIONSHIPS -> { - return resolveTargetOutputs(targets.getRelationships(), targetOutputs); - } - case POST_QUERIES -> { - return resolveTargetOutputs(targets.getCustomQueries(), targetOutputs); - } - case END -> { - return new ArrayList<>(targetOutputs.values()); - } - } - Assertions.fail("unexpected stage %s", stage); - return null; - } - - private static List> resolveTargetOutputs( - List targets, Map> namedOutputs) { - return targets.stream().map(Target::getName).map(namedOutputs::get).collect(Collectors.toList()); - } - - private List> dependenciesOf( - Target target, - Map> processedTargets, - Map>> preActions) { - var result = allDependenciesOf(target).stream() - .map(name -> (PCollection) processedTargets.get(name)) - .collect(Collectors.toCollection((Supplier>>) ArrayList::new)); - switch (target) { - case NodeTarget n -> result.addAll(preActions.get(ActionStage.PRE_NODES)); - case RelationshipTarget r -> result.addAll(preActions.get(ActionStage.PRE_RELATIONSHIPS)); - case CustomQueryTarget c -> result.addAll(preActions.get(ActionStage.PRE_QUERIES)); - default -> Assertions.fail("unexpected target type %s", target.getClass()); - } - return result; - } - - private List allDependenciesOf(Target target) { - if (!(target instanceof RelationshipTarget relationshipTarget)) { - return target.getDependencies(); - } - List dependencies = new ArrayList<>(target.getDependencies()); - String startReference = relationshipTarget.getStartNodeReference(); - if (startReference != null) { - dependencies.add(startReference); - } - String endReference = relationshipTarget.getEndNodeReference(); - if (endReference != null) { - dependencies.add(endReference); - } - return dependencies; + private static List> dependencyOutputs( + List dependencies, Map> outputs) { + return dependencies.stream().map(dep -> outputs.get(dep.name())).collect(Collectors.toUnmodifiableList()); } private T read(String classpathResource, ThrowingFunction fn) throws Exception { @@ -370,24 +281,22 @@ class TargetIO extends PTransform<@NotNull PCollection, @NotNull PCollectio private final String url; private final String password; - private final ImportSpecification importSpec; - private final Target target; + private final TargetStep step; - private TargetIO(String url, String password, ImportSpecification importSpec, Target target) { + private TargetIO(String url, String password, TargetStep step) { this.url = url; this.password = password; - this.importSpec = importSpec; - this.target = target; + this.step = step; } public static PTransform<@NotNull PCollection, @NotNull PCollection> writeAll( - String url, String password, ImportSpecification importSpec, Target target) { - return new TargetIO(url, password, importSpec, target); + String url, String password, TargetStep step) { + return new TargetIO(url, password, step); } @Override public @NotNull PCollection expand(@NotNull PCollection input) { - return input.apply("Write rows to Neo4j", ParDo.of(TargetWriteRowFn.of(url, password, importSpec, target))) + return input.apply("Write rows to Neo4j", ParDo.of(TargetWriteRowFn.of(url, password, step))) .setCoder(SchemaCoder.of(input.getSchema())); } } @@ -445,19 +354,17 @@ class TargetWriteRowFn extends DoFn { private final String url; private final String password; - private final ImportSpecification importSpec; - private final Target target; + private final TargetStep step; private transient Driver driver; - private TargetWriteRowFn(String url, String password, ImportSpecification importSpec, Target target) { + private TargetWriteRowFn(String url, String password, TargetStep step) { this.url = url; this.password = password; - this.importSpec = importSpec; - this.target = target; + this.step = step; } - public static DoFn of(String url, String password, ImportSpecification importSpec, Target target) { - return new TargetWriteRowFn(url, password, importSpec, target); + public static DoFn of(String url, String password, TargetStep step) { + return new TargetWriteRowFn(url, password, step); } @Setup @@ -469,101 +376,97 @@ public void setUp() { @ProcessElement public void processElement(ProcessContext context) { Row row = context.element(); - switch (target) { - case CustomQueryTarget queryTarget -> driver.executableQuery(queryTarget.getQuery()) + switch (step) { + case CustomQueryTargetStep step -> driver.executableQuery(step.query()) .withParameters(Map.of("rows", List.of(properties(row)))) .execute(); - case NodeTarget nodeTarget -> driver.executableQuery("%s ".formatted(writeMode(nodeTarget)) - + "(n:%s%s) " - .formatted( - String.join(":", nodeTarget.getLabels()), nodePattern(nodeTarget, "props")) - + "SET n += $props") - .withParameters(Map.of("props", propertiesFor(nodeTarget.getProperties(), row))) - .execute(); - case RelationshipTarget relationshipTarget -> { - NodeTarget start = importSpec.getTargets().getNodes().stream() - .filter(node -> node.getName().equals(relationshipTarget.getStartNodeReference())) - .findFirst() - .orElseThrow(); - NodeTarget end = importSpec.getTargets().getNodes().stream() - .filter(node -> node.getName().equals(relationshipTarget.getEndNodeReference())) - .findFirst() - .orElseThrow(); - driver.executableQuery( - "%1$s (start:%2$s%3$s) %1$s (end:%4$s%5$s) %6$s (start)-[r:%7$s]->(end) SET r = $props" - .formatted( - nodeMatchMode(relationshipTarget), - String.join(":", start.getLabels()), - nodePattern(start, "start"), - String.join(":", end.getLabels()), - nodePattern(end, "end"), - writeMode(relationshipTarget), - relationshipTarget.getType())) + case NodeTargetStep step -> { + var keys = step.keyProperties(); + var nonKeys = step.nonKeyProperties(); + driver.executableQuery("%s (n:%s%s) %s" + .formatted( + step.writeMode(), + String.join(":", step.labels()), + entityPattern("row", keys), + setClause("n", "row", nonKeys))) + .withParameters(Map.of("row", rowValues(keys, nonKeys, row))) + .execute(); + } + case RelationshipTargetStep step -> { + var start = step.startNode(); + var end = step.endNode(); + var keys = step.keyProperties(); + var nonKeys = step.nonKeyProperties(); + driver.executableQuery("%s (start:%s%s) %s (end:%s%s) %s (start)-[r:%s%s]->(end) %s" + .formatted( + step.nodeMatchMode(), + String.join(":", start.labels()), + entityPattern("start", start.keyProperties()), + step.nodeMatchMode(), + String.join(":", end.labels()), + entityPattern("end", end.keyProperties()), + step.writeMode(), + step.type(), + entityPattern("row", keys), + setClause("r", "row", nonKeys))) .withParameters(Map.of( "start", nodeKeyValues(start, row), "end", nodeKeyValues(end, row), - "props", propertiesFor(relationshipTarget.getProperties(), row))) + "row", rowValues(keys, nonKeys, row))) .execute(); } - default -> { - Assertions.fail("unsupported target type: %s", target.getClass()); - } + default -> Assertions.fail("unsupported target type: %s", step.getClass()); } context.output(row); } - private Set nodeKeyPropertyNames(NodeTarget node) { - Set keyProps = new HashSet<>(); - - node.getSchema().getKeyConstraints().forEach(kc -> keyProps.addAll(kc.getProperties())); - - if (keyProps.isEmpty()) { - node.getProperties().forEach(p -> keyProps.add(p.getTargetProperty())); + private String entityPattern(String parameterName, List keyProperties) { + StringBuilder builder = new StringBuilder(); + builder.append(" {"); + for (int i = 0; i < keyProperties.size(); i++) { + PropertyMapping mapping = keyProperties.get(i); + builder.append( + "`%s`:$%s.`%s`".formatted(mapping.getTargetProperty(), parameterName, mapping.getSourceField())); + if (i != keyProperties.size() - 1) { + builder.append(","); + } } - - return keyProps; - } - - private Map nodeKeyValues(NodeTarget node, Row row) { - return nodeKeyPropertyNames(node).stream() - .map(name -> node.getProperties().stream() - .filter(p -> p.getTargetProperty().equals(name)) - .findFirst() - .orElseThrow()) - .map(mapping -> Map.entry(mapping.getTargetProperty(), propertyValue(mapping, row))) - .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + builder.append("}"); + return builder.toString(); } - private String nodePattern(NodeTarget nodeTarget, String parameterName) { + private String setClause(String nodeVariableName, String parameterName, List nonKeyProperties) { + if (nonKeyProperties.isEmpty()) { + return ""; + } StringBuilder builder = new StringBuilder(); - builder.append("{"); - List mappings = nodeTarget.getProperties(); - List nodeKeyConstraints = nodeTarget.getSchema().getKeyConstraints(); - for (int i = 0; i < nodeKeyConstraints.size(); i++) { - NodeKeyConstraint key = nodeKeyConstraints.get(i); - List properties = key.getProperties(); - for (int j = 0; j < properties.size(); j++) { - String property = properties.get(j); - PropertyMapping mapping = mappings.stream() - .filter(m -> m.getTargetProperty().equals(property)) - .findFirst() - .orElseThrow(() -> new RuntimeException( - "could not find property mapping for property %s".formatted(property))); - String targetProperty = mapping.getTargetProperty(); - builder.append("`%1$s`:$%2$s.`%1$s`".formatted(targetProperty, parameterName)); - if (i != nodeKeyConstraints.size() - 1 || j != properties.size() - 1) { - builder.append(","); - } + builder.append("SET "); + for (int i = 0; i < nonKeyProperties.size(); i++) { + PropertyMapping mapping = nonKeyProperties.get(i); + builder.append("%s.`%s` = $%s.`%s`" + .formatted(nodeVariableName, mapping.getTargetProperty(), parameterName, mapping.getSourceField())); + if (i != nonKeyProperties.size() - 1) { + builder.append(","); } } - builder.append("}"); return builder.toString(); } - private Map propertiesFor(List properties, Row row) { - return properties.stream() - .map(mapping -> Map.entry(mapping.getTargetProperty(), propertyValue(mapping, row))) - .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + private Map nodeKeyValues(NodeTargetStep node, Row row) { + return node.keyProperties().stream() + .collect(Collectors.toMap(PropertyMapping::getSourceField, mapping -> propertyValue(mapping, row))); + } + + private Map rowValues( + List keyMappings, List nonKeyMappings, Row row) { + var result = new HashMap(keyMappings.size() + nonKeyMappings.size()); + keyMappings.forEach(mapping -> { + result.put(mapping.getSourceField(), propertyValue(mapping, row)); + }); + nonKeyMappings.forEach(mapping -> { + result.put(mapping.getSourceField(), propertyValue(mapping, row)); + }); + return result; } private Map properties(Row row) { @@ -576,10 +479,6 @@ private static Object propertyValue(PropertyMapping mapping, Row row) { return row.getValue(mapping.getSourceField()); } - private static String writeMode(NodeTarget target) { - return target.getWriteMode().name(); - } - private static String writeMode(RelationshipTarget target) { return target.getWriteMode().name(); } diff --git a/src/test/java/org/neo4j/importer/v1/graph/GraphTest.java b/src/test/java/org/neo4j/importer/v1/graph/GraphsTest.java similarity index 91% rename from src/test/java/org/neo4j/importer/v1/graph/GraphTest.java rename to src/test/java/org/neo4j/importer/v1/graph/GraphsTest.java index 674a20c..a2261b0 100644 --- a/src/test/java/org/neo4j/importer/v1/graph/GraphTest.java +++ b/src/test/java/org/neo4j/importer/v1/graph/GraphsTest.java @@ -29,7 +29,7 @@ import org.assertj.core.api.ThrowingConsumer; import org.junit.jupiter.api.Test; -class GraphTest { +class GraphsTest { @Test void topologically_sorts_dependencyless_dependency_graph() { @@ -38,7 +38,7 @@ void topologically_sorts_dependencyless_dependency_graph() { graph.put("b", Set.of()); graph.put("c", Set.of()); - List result = Graph.runTopologicalSort(graph); + List result = Graphs.runTopologicalSort(graph); assertThat(result).satisfies(topologicalSortOf(graph)); } @@ -50,7 +50,7 @@ void topologically_sorts_dependency_graph() { graph.put("c", linkedHashSet("d", "e", "f")); graph.put("e", linkedHashSet("g")); - List result = Graph.runTopologicalSort(graph); + List result = Graphs.runTopologicalSort(graph); assertThat(result).satisfies(topologicalSortOf(graph)); } @@ -61,7 +61,7 @@ void fails_to_topologically_sort_cyclic_dependency_graph() { graph.put("a", linkedHashSet("b")); graph.put("b", linkedHashSet("a")); - assertThatThrownBy(() -> Graph.runTopologicalSort(graph)) + assertThatThrownBy(() -> Graphs.runTopologicalSort(graph)) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("The provided graph {a=[b], b=[a]} defines cycles"); } @@ -77,7 +77,7 @@ void fails_to_topologically_sort_dependency_graph_with_long_cycles() { graph.put("f", linkedHashSet("g")); graph.put("g", linkedHashSet("a")); - assertThatThrownBy(() -> Graph.runTopologicalSort(graph)) + assertThatThrownBy(() -> Graphs.runTopologicalSort(graph)) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining( "The provided graph {a=[b], b=[c], c=[d], d=[e], e=[f], f=[g], g=[a]} defines cycles"); @@ -85,7 +85,7 @@ void fails_to_topologically_sort_dependency_graph_with_long_cycles() { @Test void detects_no_cycles_for_empty_graph_with_complex_graph() { - var cycles = Graph.detectCycles(new LinkedHashMap<>()); + var cycles = Graphs.detectCycles(new LinkedHashMap<>()); assertThat(cycles).isEmpty(); } @@ -97,7 +97,7 @@ void detects_no_cycles_for_dag_with_complex_graph() { graph.put("c", linkedHashSet("d", "e", "f")); graph.put("e", linkedHashSet("g")); - var cycles = Graph.detectCycles(graph); + var cycles = Graphs.detectCycles(graph); assertThat(cycles).isEmpty(); } @@ -108,7 +108,7 @@ void detects_direct_cycles_with_complex_graph() { graph.put("a", linkedHashSet("a", "b")); graph.put("b", linkedHashSet("b", "c")); - var cycles = Graph.detectCycles(graph); + var cycles = Graphs.detectCycles(graph); assertThat(cycles).isEqualTo(List.of(List.of("a"), List.of("b"))); } @@ -119,7 +119,7 @@ void detects_cycles_with_complex_graph() { graph.put("a", linkedHashSet("b", "c")); graph.put("c", linkedHashSet("a", "d")); - var cycles = Graph.detectCycles(graph); + var cycles = Graphs.detectCycles(graph); assertThat(cycles).isEqualTo(List.of(List.of("a", "c"))); } diff --git a/src/test/java/org/neo4j/importer/v1/pipeline/NodeTargetTaskTest.java b/src/test/java/org/neo4j/importer/v1/pipeline/NodeTargetTaskTest.java new file mode 100644 index 0000000..15e1d9e --- /dev/null +++ b/src/test/java/org/neo4j/importer/v1/pipeline/NodeTargetTaskTest.java @@ -0,0 +1,160 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.importer.v1.pipeline; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Random; +import org.junit.jupiter.api.Test; +import org.neo4j.importer.v1.targets.NodeExistenceConstraint; +import org.neo4j.importer.v1.targets.NodeKeyConstraint; +import org.neo4j.importer.v1.targets.NodeSchema; +import org.neo4j.importer.v1.targets.NodeTarget; +import org.neo4j.importer.v1.targets.NodeUniqueConstraint; +import org.neo4j.importer.v1.targets.PropertyMapping; +import org.neo4j.importer.v1.targets.WriteMode; + +class NodeTargetTaskTest { + + private final Random random = new Random(); + + private final PropertyMapping mapping1 = mappingTo("prop1"); + private final PropertyMapping mapping2 = mappingTo("prop2"); + private final PropertyMapping mapping3 = mappingTo("prop3"); + private final PropertyMapping mapping4 = mappingTo("prop4"); + private final List properties = List.of(mapping1, mapping2, mapping3, mapping4); + + @Test + void returns_no_keys_when_schema_is_not_defined() { + NodeSchema schema = null; + + var task = new NodeTargetStep( + new NodeTarget( + true, + "a-target", + "a-source", + null, + WriteMode.CREATE, + null, + List.of("Label"), + properties, + schema), + List.of()); + + assertThat(task.keyProperties()).isEmpty(); + assertThat(task.nonKeyProperties()).isEqualTo(properties); + } + + @Test + void returns_key_and_non_key_properties() { + var schema = + schemaFor(List.of(key("Label", List.of("prop1", "prop2")), key("Label", List.of("prop2", "prop4")))); + + var task = new NodeTargetStep( + new NodeTarget( + true, + "a-target", + "a-source", + null, + WriteMode.CREATE, + null, + List.of("Label"), + properties, + schema), + List.of()); + + assertThat(task.keyProperties()).containsExactly(mapping1, mapping2, mapping4); + assertThat(task.nonKeyProperties()).containsExactly(mapping3); + } + + @Test + void returns_non_null_unique_properties_as_keys() { + var schema = schemaFor( + List.of(unique("Label", List.of("prop1", "prop2")), unique("Label", List.of("prop2", "prop4"))), + List.of(notNull("Label", "prop2"), notNull("Label", "prop3"), notNull("Label", "prop4"))); + + var task = new NodeTargetStep( + new NodeTarget( + true, + "a-target", + "a-source", + null, + WriteMode.CREATE, + null, + List.of("Label"), + properties, + schema), + List.of()); + + assertThat(task.keyProperties()).containsExactly(mapping2, mapping4); + assertThat(task.nonKeyProperties()).containsExactly(mapping1, mapping3); + } + + @Test + void returns_both_key_and_non_null_unique_properties() { + var schema = schemaFor( + List.of(key("Label", List.of("prop1", "prop2"))), + List.of(unique("Label", List.of("prop3"))), + List.of(notNull("Label", "prop3"), notNull("Label", "prop4"))); + + var task = new NodeTargetStep( + new NodeTarget( + true, + "a-target", + "a-source", + null, + WriteMode.CREATE, + null, + List.of("Label"), + properties, + schema), + List.of()); + + assertThat(task.keyProperties()).containsExactly(mapping1, mapping2, mapping3); + assertThat(task.nonKeyProperties()).containsExactly(mapping4); + } + + private NodeKeyConstraint key(String label, List properties) { + return new NodeKeyConstraint("key-%d".formatted(random.nextInt()), label, properties, null); + } + + private NodeUniqueConstraint unique(String label, List properties) { + return new NodeUniqueConstraint("unique-%d".formatted(random.nextInt()), label, properties, null); + } + + private NodeExistenceConstraint notNull(String label, String property) { + return new NodeExistenceConstraint("not-null-%d".formatted(random.nextInt()), label, property); + } + + private PropertyMapping mappingTo(String name) { + return new PropertyMapping("a-column-%d".formatted(random.nextInt()), name, null); + } + + private static NodeSchema schemaFor(List keys) { + return schemaFor(keys, null, null); + } + + private static NodeSchema schemaFor(List uniques, List notNulls) { + return schemaFor(null, uniques, notNulls); + } + + private static NodeSchema schemaFor( + List keys, List uniques, List notNulls) { + return new NodeSchema(null, keys, uniques, notNulls, null, null, null, null, null); + } +} diff --git a/src/test/java/org/neo4j/importer/v1/pipeline/RelationshipTargetTaskTest.java b/src/test/java/org/neo4j/importer/v1/pipeline/RelationshipTargetTaskTest.java new file mode 100644 index 0000000..f2cb2a2 --- /dev/null +++ b/src/test/java/org/neo4j/importer/v1/pipeline/RelationshipTargetTaskTest.java @@ -0,0 +1,199 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.importer.v1.pipeline; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Random; +import org.junit.jupiter.api.Test; +import org.neo4j.importer.v1.targets.NodeMatchMode; +import org.neo4j.importer.v1.targets.NodeTarget; +import org.neo4j.importer.v1.targets.PropertyMapping; +import org.neo4j.importer.v1.targets.RelationshipExistenceConstraint; +import org.neo4j.importer.v1.targets.RelationshipKeyConstraint; +import org.neo4j.importer.v1.targets.RelationshipSchema; +import org.neo4j.importer.v1.targets.RelationshipTarget; +import org.neo4j.importer.v1.targets.RelationshipUniqueConstraint; +import org.neo4j.importer.v1.targets.WriteMode; + +class RelationshipTargetTaskTest { + + private final Random random = new Random(); + + private final PropertyMapping mapping1 = mappingTo("prop1"); + private final PropertyMapping mapping2 = mappingTo("prop2"); + private final PropertyMapping mapping3 = mappingTo("prop3"); + private final PropertyMapping mapping4 = mappingTo("prop4"); + private final List properties = List.of(mapping1, mapping2, mapping3, mapping4); + + @Test + void returns_no_properties_when_schema_is_not_defined() { + RelationshipSchema schema = null; + + var task = new RelationshipTargetStep( + new RelationshipTarget( + true, + "a-target", + "a-source", + null, + "TYPE", + WriteMode.CREATE, + NodeMatchMode.MERGE, + null, + "a-node-target", + "a-node-target", + properties, + schema), + nodeTarget("a-node-target"), + nodeTarget("a-node-target"), + List.of()); + + assertThat(task.keyProperties()).isEmpty(); + assertThat(task.nonKeyProperties()).isEqualTo(properties); + } + + @Test + void returns_key_and_non_key_properties() { + var schema = schemaFor(List.of(key(List.of("prop1", "prop2")), key(List.of("prop2", "prop4")))); + + var task = new RelationshipTargetStep( + new RelationshipTarget( + true, + "a-target", + "a-source", + null, + "TYPE", + WriteMode.CREATE, + NodeMatchMode.MATCH, + null, + "start-node-target", + "end-node-target", + properties, + schema), + nodeTarget("start-node-target"), + nodeTarget("end-node-target"), + List.of()); + + assertThat(task.keyProperties()).containsExactly(mapping1, mapping2, mapping4); + assertThat(task.nonKeyProperties()).containsExactly(mapping3); + } + + @Test + void returns_non_null_unique_properties_as_keys() { + var schema = schemaFor( + List.of(unique(List.of("prop1", "prop2")), unique(List.of("prop2", "prop4"))), + List.of(notNull("prop2"), notNull("prop3"), notNull("prop4"))); + + var task = new RelationshipTargetStep( + new RelationshipTarget( + true, + "a-target", + "a-source", + null, + "TYPE", + WriteMode.CREATE, + NodeMatchMode.MATCH, + null, + "start-node-target", + "end-node-target", + properties, + schema), + nodeTarget("start-node-target"), + nodeTarget("end-node-target"), + List.of()); + + assertThat(task.keyProperties()).containsExactly(mapping2, mapping4); + assertThat(task.nonKeyProperties()).containsExactly(mapping1, mapping3); + } + + @Test + void returns_both_key_and_non_null_unique_properties() { + var schema = schemaFor( + List.of(key(List.of("prop1", "prop2"))), + List.of(unique(List.of("prop3"))), + List.of(notNull("prop3"), notNull("prop4"))); + + var task = new RelationshipTargetStep( + new RelationshipTarget( + true, + "a-target", + "a-source", + null, + "TYPE", + WriteMode.CREATE, + NodeMatchMode.MATCH, + null, + "start-node-target", + "end-node-target", + properties, + schema), + nodeTarget("start-node-target"), + nodeTarget("end-node-target"), + List.of()); + + assertThat(task.keyProperties()).containsExactly(mapping1, mapping2, mapping3); + assertThat(task.nonKeyProperties()).containsExactly(mapping4); + } + + private RelationshipKeyConstraint key(List properties) { + return new RelationshipKeyConstraint("key-%d".formatted(random.nextInt()), properties, null); + } + + private RelationshipUniqueConstraint unique(List properties) { + return new RelationshipUniqueConstraint("unique-%d".formatted(random.nextInt()), properties, null); + } + + private RelationshipExistenceConstraint notNull(String property) { + return new RelationshipExistenceConstraint("not-null-%d".formatted(random.nextInt()), property); + } + + private PropertyMapping mappingTo(String name) { + return new PropertyMapping("a-column-%d".formatted(random.nextInt()), name, null); + } + + private static RelationshipSchema schemaFor(List keys) { + return schemaFor(keys, null, null); + } + + private static RelationshipSchema schemaFor( + List uniques, List notNulls) { + return schemaFor(null, uniques, notNulls); + } + + private static RelationshipSchema schemaFor( + List keys, + List uniques, + List notNulls) { + return new RelationshipSchema(null, keys, uniques, notNulls, null, null, null, null, null); + } + + private static NodeTargetStep nodeTarget(String name) { + return new NodeTargetStep( + new NodeTarget( + true, + name, + "a-source", + null, + WriteMode.CREATE, + null, + List.of("Label"), + List.of(new PropertyMapping("a-field", "a-property", null)), + null), + List.of()); + } +} diff --git a/src/test/java/org/neo4j/importer/v1/targets/EntityTargetTest.java b/src/test/java/org/neo4j/importer/v1/targets/EntityTargetTest.java index b54c7bc..1f4c0c1 100644 --- a/src/test/java/org/neo4j/importer/v1/targets/EntityTargetTest.java +++ b/src/test/java/org/neo4j/importer/v1/targets/EntityTargetTest.java @@ -23,15 +23,6 @@ class EntityTargetTest { - @Test - void returns_no_node_properties_when_they_are_not_defined() { - List properties = null; - var target = new NodeTarget( - true, "a-target", "a-source", null, WriteMode.CREATE, null, List.of("Label"), properties, null); - - assertThat(target.getAllProperties()).isEmpty(); - } - @Test void returns_no_relationship_properties_when_they_are_not_defined() { List properties = null; @@ -49,6 +40,6 @@ void returns_no_relationship_properties_when_they_are_not_defined() { properties, null); - assertThat(target.getAllProperties()).isEmpty(); + assertThat(target.getProperties()).isEmpty(); } } diff --git a/src/test/java/org/neo4j/importer/v1/targets/NodeTargetTest.java b/src/test/java/org/neo4j/importer/v1/targets/NodeTargetTest.java deleted file mode 100644 index d5ee0d0..0000000 --- a/src/test/java/org/neo4j/importer/v1/targets/NodeTargetTest.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [https://neo4j.com] - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.importer.v1.targets; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.util.List; -import java.util.Random; -import org.junit.jupiter.api.Test; - -class NodeTargetTest { - private final Random random = new Random(); - - @Test - void returns_no_keys_when_schema_is_not_defined() { - NodeSchema schema = null; - var target = new NodeTarget( - true, - "a-target", - "a-source", - null, - WriteMode.CREATE, - null, - List.of("Label"), - List.of(mappingTo("prop")), - schema); - - assertThat(target.getKeyProperties()).isEmpty(); - } - - @Test - void returns_key_properties() { - var properties = List.of(mappingTo("prop1"), mappingTo("prop2"), mappingTo("prop3"), mappingTo("prop4")); - var schema = - schemaFor(List.of(key("Label", List.of("prop1", "prop2")), key("Label", List.of("prop2", "prop4")))); - var target = new NodeTarget( - true, "a-target", "a-source", null, WriteMode.CREATE, null, List.of("Label"), properties, schema); - - assertThat(target.getKeyProperties()).isEqualTo(List.of("prop1", "prop2", "prop4")); - } - - @Test - void returns_non_null_unique_properties() { - var properties = List.of(mappingTo("prop1"), mappingTo("prop2"), mappingTo("prop3"), mappingTo("prop4")); - var schema = schemaFor( - List.of(unique("Label", List.of("prop1", "prop2")), unique("Label", List.of("prop2", "prop4"))), - List.of(notNull("Label", "prop2"), notNull("Label", "prop3"), notNull("Label", "prop4"))); - var target = new NodeTarget( - true, "a-target", "a-source", null, WriteMode.CREATE, null, List.of("Label"), properties, schema); - - assertThat(target.getKeyProperties()).isEqualTo(List.of("prop2", "prop4")); - } - - @Test - void returns_both_key_and_non_null_unique_properties() { - var properties = List.of(mappingTo("prop1"), mappingTo("prop2"), mappingTo("prop3"), mappingTo("prop4")); - var schema = schemaFor( - List.of(key("Label", List.of("prop1", "prop2"))), - List.of(unique("Label", List.of("prop3"))), - List.of(notNull("Label", "prop3"), notNull("Label", "prop4"))); - var target = new NodeTarget( - true, "a-target", "a-source", null, WriteMode.CREATE, null, List.of("Label"), properties, schema); - - assertThat(target.getKeyProperties()).isEqualTo(List.of("prop1", "prop2", "prop3")); - } - - private NodeKeyConstraint key(String label, List properties) { - return new NodeKeyConstraint("key-%d".formatted(random.nextInt()), label, properties, null); - } - - private NodeUniqueConstraint unique(String label, List properties) { - return new NodeUniqueConstraint("unique-%d".formatted(random.nextInt()), label, properties, null); - } - - private NodeExistenceConstraint notNull(String label, String property) { - return new NodeExistenceConstraint("not-null-%d".formatted(random.nextInt()), label, property); - } - - private PropertyMapping mappingTo(String name) { - return new PropertyMapping("a-column-%d".formatted(random.nextInt()), name, null); - } - - private static NodeSchema schemaFor(List keys) { - return schemaFor(keys, null, null); - } - - private static NodeSchema schemaFor(List uniques, List notNulls) { - return schemaFor(null, uniques, notNulls); - } - - private static NodeSchema schemaFor( - List keys, List uniques, List notNulls) { - return new NodeSchema(null, keys, uniques, notNulls, null, null, null, null, null); - } -} diff --git a/src/test/java/org/neo4j/importer/v1/targets/RelationshipTargetTest.java b/src/test/java/org/neo4j/importer/v1/targets/RelationshipTargetTest.java deleted file mode 100644 index f3fe5a2..0000000 --- a/src/test/java/org/neo4j/importer/v1/targets/RelationshipTargetTest.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [https://neo4j.com] - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.importer.v1.targets; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.util.List; -import java.util.Random; -import org.junit.jupiter.api.Test; - -class RelationshipTargetTest { - - private final Random random = new Random(); - - @Test - void returns_no_keys_when_schema_is_not_defined() { - RelationshipSchema schema = null; - var target = new RelationshipTarget( - true, - "a-target", - "a-source", - null, - "TYPE", - WriteMode.CREATE, - NodeMatchMode.MERGE, - null, - "a-node-target", - "a-node-target", - List.of(mappingTo("prop")), - schema); - - assertThat(target.getKeyProperties()).isEmpty(); - } - - @Test - void returns_key_properties() { - var properties = List.of(mappingTo("prop1"), mappingTo("prop2"), mappingTo("prop3"), mappingTo("prop4")); - var schema = schemaFor(List.of(key(List.of("prop1", "prop2")), key(List.of("prop2", "prop4")))); - var target = new RelationshipTarget( - true, - "a-target", - "a-source", - null, - "TYPE", - WriteMode.CREATE, - NodeMatchMode.MATCH, - null, - "start-node-target", - "end-node-target", - properties, - schema); - - assertThat(target.getKeyProperties()).isEqualTo(List.of("prop1", "prop2", "prop4")); - } - - @Test - void returns_non_null_unique_properties() { - var properties = List.of(mappingTo("prop1"), mappingTo("prop2"), mappingTo("prop3"), mappingTo("prop4")); - var schema = schemaFor( - List.of(unique(List.of("prop1", "prop2")), unique(List.of("prop2", "prop4"))), - List.of(notNull("prop2"), notNull("prop3"), notNull("prop4"))); - var target = new RelationshipTarget( - true, - "a-target", - "a-source", - null, - "TYPE", - WriteMode.CREATE, - NodeMatchMode.MATCH, - null, - "start-node-target", - "end-node-target", - properties, - schema); - - assertThat(target.getKeyProperties()).isEqualTo(List.of("prop2", "prop4")); - } - - @Test - void returns_both_key_and_non_null_unique_properties() { - var properties = List.of(mappingTo("prop1"), mappingTo("prop2"), mappingTo("prop3"), mappingTo("prop4")); - var schema = schemaFor( - List.of(key(List.of("prop1", "prop2"))), - List.of(unique(List.of("prop3"))), - List.of(notNull("prop3"), notNull("prop4"))); - var target = new RelationshipTarget( - true, - "a-target", - "a-source", - null, - "TYPE", - WriteMode.CREATE, - NodeMatchMode.MATCH, - null, - "start-node-target", - "end-node-target", - properties, - schema); - - assertThat(target.getKeyProperties()).isEqualTo(List.of("prop1", "prop2", "prop3")); - } - - private RelationshipKeyConstraint key(List properties) { - return new RelationshipKeyConstraint("key-%d".formatted(random.nextInt()), properties, null); - } - - private RelationshipUniqueConstraint unique(List properties) { - return new RelationshipUniqueConstraint("unique-%d".formatted(random.nextInt()), properties, null); - } - - private RelationshipExistenceConstraint notNull(String property) { - return new RelationshipExistenceConstraint("not-null-%d".formatted(random.nextInt()), property); - } - - private PropertyMapping mappingTo(String name) { - return new PropertyMapping("a-column-%d".formatted(random.nextInt()), name, null); - } - - private static RelationshipSchema schemaFor(List keys) { - return schemaFor(keys, null, null); - } - - private static RelationshipSchema schemaFor( - List uniques, List notNulls) { - return schemaFor(null, uniques, notNulls); - } - - private static RelationshipSchema schemaFor( - List keys, - List uniques, - List notNulls) { - return new RelationshipSchema(null, keys, uniques, notNulls, null, null, null, null, null); - } -} diff --git a/src/test/java/org/neo4j/importer/v1/targets/TargetTest.java b/src/test/java/org/neo4j/importer/v1/targets/TargetTest.java index 5b6f0e6..4dc2524 100644 --- a/src/test/java/org/neo4j/importer/v1/targets/TargetTest.java +++ b/src/test/java/org/neo4j/importer/v1/targets/TargetTest.java @@ -418,120 +418,4 @@ void deserializes_relationship_target() throws Exception { "property1", Map.of("vector.dimensions", 1536, "vector.similarity_function", "cosine"))))); } - - @Test - void compares_unrelated_targets_by_name() { - List mappings = List.of(new PropertyMapping("src", "prop", null)); - Target nodeTarget = new NodeTarget( - true, "a-node-target", "a-source", null, WriteMode.CREATE, null, List.of("Label"), mappings, null); - Target customQueryTarget = new CustomQueryTarget(true, "a-query-target", "a-source", null, "RETURN 42"); - Target relationshipTarget = new RelationshipTarget( - true, - "a-relationship-target", - "a-source", - null, - "TYPE", - WriteMode.CREATE, - NodeMatchMode.MERGE, - null, - "start-node-ref", - "end-node-ref", - mappings, - null); - - assertThat(nodeTarget).isLessThan(customQueryTarget).isLessThan(relationshipTarget); - assertThat(customQueryTarget).isGreaterThan(nodeTarget).isLessThan(relationshipTarget); - assertThat(relationshipTarget).isGreaterThan(nodeTarget).isGreaterThan(nodeTarget); - } - - @Test - void compares_targets_by_dependencies() { - List mappings = List.of(new PropertyMapping("src", "prop", null)); - Target nodeTarget = new NodeTarget( - true, - "a-node-target", - "a-source", - List.of("a-relationship-target"), - WriteMode.CREATE, - null, - List.of("Label"), - mappings, - null); - Target relationshipTarget = new RelationshipTarget( - true, - "a-relationship-target", - "a-source", - List.of("a-query-target"), - "TYPE", - WriteMode.CREATE, - NodeMatchMode.MERGE, - null, - "start-node-ref", - "end-node-ref", - mappings, - null); - Target customQueryTarget = new CustomQueryTarget( - true, "a-custom-query-target", "a-source", List.of("another-node-target"), "RETURN 42"); - Target anotherNodeTarget = new NodeTarget( - true, - "another-node-target", - "a-source", - null, - WriteMode.CREATE, - null, - List.of("Label"), - mappings, - null); - - assertThat(nodeTarget).isGreaterThan(relationshipTarget); - assertThat(relationshipTarget).isLessThan(nodeTarget).isGreaterThan(customQueryTarget); - assertThat(customQueryTarget).isLessThan(relationshipTarget).isGreaterThan(anotherNodeTarget); - assertThat(anotherNodeTarget).isLessThan(customQueryTarget); - } - - @Test - void compares_targets_by_start_node_reference() { - List mappings = List.of(new PropertyMapping("src", "prop", null)); - Target nodeTarget = new NodeTarget( - true, "a-node-target", "a-source", null, WriteMode.CREATE, null, List.of("Label"), mappings, null); - Target relationshipTarget = new RelationshipTarget( - true, - "a-relationship-target", - "a-source", - null, - "TYPE", - WriteMode.CREATE, - NodeMatchMode.MERGE, - null, - "a-node-target", - "end-node-ref", - mappings, - null); - - assertThat(nodeTarget).isLessThan(relationshipTarget); - assertThat(relationshipTarget).isGreaterThan(nodeTarget); - } - - @Test - void compares_targets_by_end_node_reference() { - List mappings = List.of(new PropertyMapping("src", "prop", null)); - Target nodeTarget = new NodeTarget( - true, "a-node-target", "a-source", null, WriteMode.CREATE, null, List.of("Label"), mappings, null); - Target relationshipTarget = new RelationshipTarget( - true, - "a-relationship-target", - "a-source", - null, - "TYPE", - WriteMode.CREATE, - NodeMatchMode.MERGE, - null, - "start-node-ref", - "a-node-target", - mappings, - null); - - assertThat(nodeTarget).isLessThan(relationshipTarget); - assertThat(relationshipTarget).isGreaterThan(nodeTarget); - } }