Skip to content

Commit

Permalink
feat: introduce pipeline API (#169)
Browse files Browse the repository at this point in the history
First, `ImportSpecification` is now devoid of any "business" logic.
It has instead become a simple target for JSON/Yaml
deserialization.

This commit introduces `ImportPipeline` which takes care of A LOT
of boilerplate when writing an import-spec backend.

It wraps sources, actions and targets as tasks and sorts them
accordingly.

The task classes provide helper functions to resolve key/non-key
properties, start/end nodes.

BREAKING CHANGE: source names must now be globally unique, since
this simplifies the pipeline construction logic quite a bit.
  • Loading branch information
fbiville authored Nov 22, 2024
1 parent b2faedb commit 59915d8
Show file tree
Hide file tree
Showing 37 changed files with 1,774 additions and 1,072 deletions.
2 changes: 1 addition & 1 deletion .teamcity/builds/Build.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class Build(
buildType(WhiteListCheck("${name}-whitelist-check", "white-list check"))
if (forPullRequests) dependentBuildType(PRCheck("${name}-pr-check", "pr check"))
parallel {
listOf("11", "17", "21").forEach { java ->
listOf("17", "21").forEach { java ->
dependentBuildType(
Maven(
"${name}-build-${java}",
Expand Down
2 changes: 1 addition & 1 deletion .teamcity/builds/Common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const val GITHUB_OWNER = "neo4j"
const val GITHUB_REPOSITORY = "import-spec"
const val MAVEN_DEFAULT_ARGS = "--no-transfer-progress --batch-mode --show-version"

const val DEFAULT_JAVA_VERSION = "11"
const val DEFAULT_JAVA_VERSION = "17"
const val LTS_JAVA_VERSION = "21"

enum class LinuxSize(val value: String) {
Expand Down
21 changes: 10 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,15 @@ class GettingStarted {

public static void main(String... args) {

try (Reader reader = createReader("spec.json")) {
var importSpecification = ImportSpecificationDeserializer.deserialize(reader);

var config = importSpecification.getConfiguration();
var sources = importSpecification.getSources();
var actions = importSpecification.getActions();
var targets = importSpecification.getTargets();
var nodeTargets = targets.getNodes();
var relationshipTargets = targets.getRelationships();
var customQueryTargets = targets.getCustomQueries();
try (var reader = new InputStreamReader(createReaderFor("/import/spec.yaml"))) {
var pipeline = ImportPipeline.of(ImportSpecificationDeserializer.deserialize(reader));
pipeline.forEach(step -> {
switch (step) {
case SourceStep source -> handleSource(source);
case ActionStep action -> handleAction(action);
case TargetStep target -> handleTarget(target);
}
});
}
}
}
Expand All @@ -86,4 +85,4 @@ class GettingStarted {
## Prerequisites

- Maven
- JDK 21 (21 is used for tests, 11 for production sources)
- JDK 21 (21 is used for tests, 11 and 17 for production sources)
373 changes: 145 additions & 228 deletions examples/apache-beam/src/test/java/org/neo4j/importer/BeamExampleIT.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -333,17 +334,22 @@ private void migrateTemporalProperties() {
}

public void copyFiles(ImportSpecification specification) throws Exception {
Map<String, Source> indexedSources =
specification.getSources().stream().collect(Collectors.toMap(Source::getName, Function.identity()));
Map<String, NodeTarget> indexedNodes = specification.getTargets().getNodes().stream()
.collect(Collectors.toMap(Target::getName, Function.identity()));
for (Target target : specification.getTargets().getAll()) {
switch (target) {
case NodeTarget nodeTarget -> copyFile(specification, nodeTarget);
case RelationshipTarget relationshipTarget -> copyFile(specification, relationshipTarget);
case NodeTarget nodeTarget -> copyFile(indexedSources, nodeTarget);
case RelationshipTarget relationshipTarget -> copyFile(
indexedSources, indexedNodes, relationshipTarget);
default -> throw new RuntimeException("unsupported target type: %s".formatted(target.getClass()));
}
}
}

private void copyFile(ImportSpecification specification, NodeTarget nodeTarget) throws Exception {
var source = specification.findSourceByName(nodeTarget.getSource());
private void copyFile(Map<String, Source> sources, NodeTarget nodeTarget) throws Exception {
var source = sources.get(nodeTarget.getSource());
assertThat(source).isInstanceOf(ParquetSource.class);
File parquetFile = new File(sharedFolder, fileName(nodeTarget));
List<String> fields = readFieldNames(source);
Expand All @@ -352,23 +358,16 @@ private void copyFile(ImportSpecification specification, NodeTarget nodeTarget)
copyParquetSource((ParquetSource) source, parquetFile, fieldMappings);
}

private void copyFile(ImportSpecification specification, RelationshipTarget relationshipTarget)
private void copyFile(
Map<String, Source> sources, Map<String, NodeTarget> nodes, RelationshipTarget relationshipTarget)
throws Exception {
File parquetFile = new File(sharedFolder, fileName(relationshipTarget));

var source = specification.findSourceByName(relationshipTarget.getSource());
var source = sources.get(relationshipTarget.getSource());
assertThat(source).isInstanceOf(ParquetSource.class);

var startNodeTarget = specification.getTargets().getNodes().stream()
.filter(t -> t.getName().equals(relationshipTarget.getStartNodeReference()))
.findFirst()
.orElseThrow();

var endNodeTarget = specification.getTargets().getNodes().stream()
.filter(t -> t.getName().equals(relationshipTarget.getEndNodeReference()))
.findFirst()
.orElseThrow();

var startNodeTarget = nodes.get(relationshipTarget.getStartNodeReference());
var endNodeTarget = nodes.get(relationshipTarget.getEndNodeReference());
List<String> fields = readFieldNames(source);
Map<String, String> fieldMappings =
computeFieldMappings(fields, relationshipTarget, startNodeTarget, endNodeTarget);
Expand Down Expand Up @@ -420,7 +419,7 @@ private static String[] importCommand(ImportSpecification specification, String

private static Map<String, String> computeFieldMappings(List<String> fields, NodeTarget nodeTarget) {
var mappings = indexByField(nodeTarget.getProperties());
var keyProperties = new HashSet<>(nodeTarget.getKeyProperties());
var keyProperties = new HashSet<>(getKeyProperties(nodeTarget));

for (String field : fields) {
var property = mappings.get(field);
Expand All @@ -443,19 +442,31 @@ private static Map<String, String> computeFieldMappings(
NodeTarget endNodeTarget) {
var mappings = indexByField(relationshipTarget.getProperties());

var startNodeKeys = getKeyProperties(startNodeTarget);
startNodeTarget.getProperties().stream()
.filter(m -> fields.contains(m.getSourceField()))
.filter(m -> startNodeTarget.getKeyProperties().contains(m.getTargetProperty()))
.filter(m -> startNodeKeys.contains(m.getTargetProperty()))
.forEach(m -> mappings.put(m.getSourceField(), startIdSpaceFor(startNodeTarget)));

var endNodeKeys = getKeyProperties(endNodeTarget);
endNodeTarget.getProperties().stream()
.filter(m -> fields.contains(m.getSourceField()))
.filter(m -> endNodeTarget.getKeyProperties().contains(m.getTargetProperty()))
.filter(m -> endNodeKeys.contains(m.getTargetProperty()))
.forEach(m -> mappings.put(m.getSourceField(), endIdSpaceFor(endNodeTarget)));

return mappings;
}

private static Set<String> getKeyProperties(NodeTarget nodeTarget) {
var schema = nodeTarget.getSchema();
if (schema == null) {
return Set.of();
}
return schema.getKeyConstraints().stream()
.flatMap(constraint -> constraint.getProperties().stream())
.collect(Collectors.toSet());
}

// 🐤
private static List<String> readFieldNames(Source source) throws Exception {
try (var connection = DriverManager.getConnection("jdbc:duckdb:");
Expand Down
31 changes: 31 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,26 @@
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<id>compile-java-17</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
<configuration>
<release>17</release>
<compileSourceRoots>
<compileSourceRoot>${project.basedir}/src/main/java17</compileSourceRoot>
</compileSourceRoots>
<multiReleaseOutput>true</multiReleaseOutput>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
Expand Down Expand Up @@ -294,6 +314,17 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifestEntries>
<Multi-Release>true</Multi-Release>
</manifestEntries>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
Expand Down
9 changes: 1 addition & 8 deletions src/main/java/org/neo4j/importer/v1/ImportSpecification.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
public class ImportSpecification implements Serializable {

private final String version;

private final Configuration configuration;

private final List<Source> sources;
Expand Down Expand Up @@ -73,14 +74,6 @@ public List<Action> getActions() {
return actions != null ? actions : Collections.emptyList();
}

public Source findSourceByName(String source) {
return sources.stream()
.filter((src) -> src.getName().equals(source))
.findFirst()
.orElseThrow(() ->
new IllegalArgumentException(String.format("Could not find any source named %s", source)));
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class Graph {
public class Graphs {

/**
* Returns a topologically-sorted list from a given dependency graph
Expand Down Expand Up @@ -73,10 +73,9 @@ public static <T> List<T> runTopologicalSort(Map<T, Set<T>> graph) { // K: depen

/**
* Detects whether the given dependency graph encoded as a map contains any cycles.
* The Map encodes dependent-to-dependencies mappings.
* - Map.of("a", Set.of("b", "c")) means item a depends on both item b and c
* - Map.of("b", Set.of()) means item b does not have any dependencies
*
* The Map encodes dependent-to-dependencies mappings.<br>
* - Map.of("a", Set.of("b", "c")) means item a depends on both item b and c<br>
* - Map.of("b", Set.of()) means item b does not have any dependencies<br>
* @param graph the dependency graph
* @param <T> the element type
* @return the list of paths with cycles, an empty list means there is no cycle
Expand Down
63 changes: 63 additions & 0 deletions src/main/java/org/neo4j/importer/v1/pipeline/ActionStep.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.importer.v1.pipeline;

import java.util.List;
import java.util.Objects;
import org.neo4j.importer.v1.actions.Action;
import org.neo4j.importer.v1.actions.ActionStage;

public class ActionStep implements ImportStep {

private final Action action;
private final List<ImportStep> dependencies;

ActionStep(Action action, List<ImportStep> dependencies) {
this.action = action;
this.dependencies = dependencies;
}

@Override
public String name() {
return action.getName();
}

public ActionStage stage() {
return action().getStage();
}

public Action action() {
return action;
}

@Override
public List<ImportStep> dependencies() {
return dependencies;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof ActionStep)) return false;
ActionStep that = (ActionStep) o;
return Objects.equals(action, that.action) && Objects.equals(dependencies, that.dependencies);
}

@Override
public int hashCode() {
return Objects.hash(action, dependencies);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.importer.v1.pipeline;

import java.util.List;
import java.util.Objects;
import org.neo4j.importer.v1.targets.CustomQueryTarget;

public class CustomQueryTargetStep extends TargetStep {

private final CustomQueryTarget target;

CustomQueryTargetStep(CustomQueryTarget target, List<ImportStep> dependencies) {
super(dependencies);
this.target = target;
}

public String query() {
return target.getQuery();
}

@Override
public boolean equals(Object o) {
if (!(o instanceof CustomQueryTargetStep)) return false;
if (!super.equals(o)) return false;
CustomQueryTargetStep that = (CustomQueryTargetStep) o;
return Objects.equals(target, that.target);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), target);
}

@Override
protected CustomQueryTarget target() {
return target;
}
}
Loading

0 comments on commit 59915d8

Please sign in to comment.