Skip to content

Commit

Permalink
Introduce relation inversion refactoring (#522)
Browse files Browse the repository at this point in the history
Fixes #377
  • Loading branch information
fbiville authored Jan 2, 2024
1 parent 6d42528 commit 20b3178
Show file tree
Hide file tree
Showing 18 changed files with 774 additions and 4 deletions.
133 changes: 131 additions & 2 deletions docs/reference-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ This results in the rename being executed in batches.

!!! warning
This setting only works if the target Neo4j instance supports `CALL {} IN TRANSACTIONS` (version 4.4 and later).
If not, the Neo4j plugin will run the label rename in a single, autocommit transaction.
If not, the Neo4j plugin will run the type rename in a single, autocommit transaction.

Make sure to read about [the consequences of changing `runInTransaction`](#change-sets-runintransaction).

Expand Down Expand Up @@ -564,7 +564,7 @@ This results in the rename being executed in batches.

!!! warning
This setting only works if the target Neo4j instance supports `CALL {} IN TRANSACTIONS` (version 4.4 and later).
If not, the Neo4j plugin will run the label rename in a single, autocommit transaction.
If not, the Neo4j plugin will run the type rename in a single, autocommit transaction.

Make sure to read about [the consequences of changing `runInTransaction`](#change-sets-runintransaction).

Expand All @@ -589,6 +589,135 @@ This results in the rename being executed in batches.
As shown above, the `batchSize` attribute can be set in order to control how many transactions are going to be executed.
If the attribute is not set, the batch size will depend on the Neo4j server's default value.


### Relationship Direction Inversion

|Required plugin version|4.25.1.1|

The direction inversion refactoring allows to flip the start and end node of relationships with the specified type,
matching all or some of them, in a single transaction or in batches.

As illustrated below, the main attributes of the refactoring are:

- `type`: type of the relationship(s) to invert


#### Global Inversion

=== "XML"
~~~~xml
{! include '../src/test/resources/e2e/invert-direction/changeLog-simple.xml' !}
~~~~

=== "JSON"

~~~~json
{! include '../src/test/resources/e2e/invert-direction/changeLog-simple.json' !}
~~~~

=== "YAML"

~~~~yaml
{! include '../src/test/resources/e2e/invert-direction/changeLog-simple.yaml' !}
~~~~

Since this operation can potentially affect a lot of relationships, running the change in a single transaction may be
infeasible since the transaction would likely run either too slow, or even run out of memory.

To prevent this, `enableBatchImport` must be set to `true`.
Since it relies on `CALL {} IN TRANSACTIONS` under the hood, the enclosing change set's `runInTransaction` must also be set to `false`.
This results in the rename being executed in batches.

!!! warning
This setting only works if the target Neo4j instance supports `CALL {} IN TRANSACTIONS` (version 4.4 and later).
If not, the Neo4j plugin will run the direction inversion in a single, autocommit transaction.

Make sure to read about [the consequences of changing `runInTransaction`](#change-sets-runintransaction).

=== "XML"
~~~~xml
{! include '../src/test/resources/e2e/invert-direction/changeLog-simple-batched.xml' !}
~~~~

=== "JSON"

~~~~json
{! include '../src/test/resources/e2e/invert-direction/changeLog-simple-batched.json' !}
~~~~

=== "YAML"

~~~~yaml
{! include '../src/test/resources/e2e/invert-direction/changeLog-simple-batched.yaml' !}
~~~~

As shown above, the `batchSize` attribute can be set in order to control how many transactions are going to be executed.
If the attribute is not set, the batch size will depend on the Neo4j server's default value.

#### Partial Rename

The following attributes can also be set, in order to match only a subset of the relationships with the type specified in `type`:

- `fragment` specifies the pattern to match the relationships against
- `outputVariable` specifies the Cypher variable name defined in `fragment` that denotes the targeted relationships

!!!note
The relationships that are going to be inverted sit at the intersection of what is defined in `fragment` and the
relationships whose type is specified by `type`.
In other words, if none of the relationships defined in `fragment` have the type defined in `type`, the inversion
is not going to modify any of those.

=== "XML"
~~~~xml
{! include '../src/test/resources/e2e/invert-direction/changeLog-pattern.xml' !}
~~~~

=== "JSON"

~~~~json
{! include '../src/test/resources/e2e/invert-direction/changeLog-pattern.json' !}
~~~~

=== "YAML"

~~~~yaml
{! include '../src/test/resources/e2e/invert-direction/changeLog-pattern.yaml' !}
~~~~

Since this operation can potentially affect a lot of relationships, running the change in a single transaction may be
infeasible since the transaction would likely run either too slow, or even run out of memory.

To prevent this, `enableBatchImport` must be set to `true`.
Since it relies on `CALL {} IN TRANSACTIONS` under the hood, the enclosing change set's `runInTransaction` must also be set to `false`.
This results in the rename being executed in batches.

!!! warning
This setting only works if the target Neo4j instance supports `CALL {} IN TRANSACTIONS` (version 4.4 and later).
If not, the Neo4j plugin will run the direction inversion in a single, autocommit transaction.

Make sure to read about [the consequences of changing `runInTransaction`](#change-sets-runintransaction).


=== "XML"
~~~~xml
{! include '../src/test/resources/e2e/invert-direction/changeLog-pattern-batched.xml' !}
~~~~

=== "JSON"

~~~~json
{! include '../src/test/resources/e2e/invert-direction/changeLog-pattern-batched.json' !}
~~~~

=== "YAML"

~~~~yaml
{! include '../src/test/resources/e2e/invert-direction/changeLog-pattern-batched.yaml' !}
~~~~

As shown above, the `batchSize` attribute can be set in order to control how many transactions are going to be executed.
If the attribute is not set, the batch size will depend on the Neo4j server's default value.

## Change Set's `runInTransaction`

|Required plugin version|4.19.0|
Expand Down
161 changes: 161 additions & 0 deletions src/main/java/liquibase/ext/neo4j/change/InvertDirectionChange.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package liquibase.ext.neo4j.change;

import liquibase.Scope;
import liquibase.change.AbstractChange;
import liquibase.change.ChangeMetaData;
import liquibase.change.DatabaseChange;
import liquibase.database.Database;
import liquibase.exception.LiquibaseException;
import liquibase.exception.ValidationErrors;
import liquibase.ext.neo4j.database.Neo4jDatabase;
import liquibase.logging.Logger;
import liquibase.statement.SqlStatement;
import liquibase.statement.core.RawParameterizedSqlStatement;
import liquibase.statement.core.RawSqlStatement;

import java.util.List;
import java.util.Map;

@DatabaseChange(name = "invertDirection", priority = ChangeMetaData.PRIORITY_DEFAULT, description =
"The 'invertDirection' tag allows you to invert the direction of relationships.\n" +
"The relationships to update are defined by the 'type' attributed, and optionally refined with the" +
"'fragment' attribute, which defines the Cypher pattern to match relationships against.\n" +
"'invertDirection' also defines the 'outputVariable' attribute. This attribute denotes the variable used in the pattern for\n" +
"the relationships to merge. If the fragment is '(:Movie)<-[d:DIRECTED_BY]-(:Director {name: 'John Woo'})-[a:ACTED_IN]->(:Movie)', " +
"the output variable is either 'd' or 'a' depending on the relationships the inversion should affect.")

public class InvertDirectionChange extends AbstractChange {

private String fragment;

private String outputVariable;

private String type;

private Boolean enableBatchImport = Boolean.FALSE;

private Long batchSize;

@Override
public boolean supports(Database database) {
return database instanceof Neo4jDatabase;
}

@Override
public ValidationErrors validate(Database database) {
ValidationErrors validation = new ValidationErrors(this);
if (Sequences.isNullOrEmpty(type)) {
validation.addError("missing type");
}
if ((fragment == null) ^ (outputVariable == null)) {
String setAttribute = fragment != null ? "fragment" : "outputVariable";
String error = String.format("both fragment and outputVariable must be set (only %s is currently set), or both must be unset", setAttribute);
validation.addError(error);
}
if ("__rel__".equals(outputVariable)) {
validation.addError(String.format("outputVariable %s clashes with the reserved variable name: __rel__. outputVariable must be renamed and fragment accordingly updated", outputVariable));
}
if (enableBatchImport && getChangeSet().isRunInTransaction()) {
validation.addError("enableBatchImport can be true only if the enclosing change set's runInTransaction attribute is set to false");
}
if (!enableBatchImport && batchSize != null) {
validation.addError("batch size must be set only if enableBatchImport is set to true");
}
if (batchSize != null && batchSize <= 0) {
validation.addError("batch size, if set, must be strictly positive");
}
Neo4jDatabase neo4j = (Neo4jDatabase) database;
if (enableBatchImport && !neo4j.supportsCallInTransactions()) {
validation.addWarning("this version of Neo4j does not support CALL {} IN TRANSACTIONS, all batch import settings are ignored");
}
validation.addAll(super.validate(database));
return validation;
}

@Override
public String getConfirmationMessage() {
if (fragment != null) {
return String.format("the direction of relationships with type %s, also denoted by %s in %s, has been inverted", type, outputVariable, fragment);
}
return String.format("the direction of relationships with type %s has been inverted", type); }

@Override
public SqlStatement[] generateStatements(Database database) {
Logger log = Scope.getCurrentScope().getLog(getClass());
boolean supportsCallInTransactions = ((Neo4jDatabase) database).supportsCallInTransactions();
if (supportsCallInTransactions && enableBatchImport) {
log.info("Running type rename in CALL {} IN TRANSACTIONS");
String batchSpec = batchSize != null ? String.format(" OF %d ROWS", batchSize) : "";
String cypher = String.format("%s " +
"CALL { " +
" WITH __rel__ " +
" MATCH (__start__) WHERE id(__start__) = id(startNode(__rel__)) " +
" MATCH (__end__) WHERE id(__end__) = id(endNode(__rel__)) " +
" CREATE (__start__)<-[__newrel__:`%s`]-(__end__) " +
" SET __newrel__ = properties(__rel__) " +
" DELETE __rel__ " +
"} IN TRANSACTIONS%s", queryStart(), type, batchSpec);
return new SqlStatement[]{new RawParameterizedSqlStatement(cypher, type)};
}
if (!supportsCallInTransactions) {
log.warning("This version of Neo4j does not support CALL {} IN TRANSACTIONS, the type rename is going to run in a single, possibly large and slow, transaction.\n" +
"Note: upgrade the Neo4j server or set the runInTransaction attribute of the enclosing change set to true to make this warning disappear.");
} else {
log.info("Running type rename in single transaction (set enableBatchImport to true to switch to CALL {} IN TRANSACTIONS)");
}
String cypher = String.format("%s " +
"MATCH (__start__) WHERE id(__start__) = id(startNode(__rel__)) " +
"MATCH (__end__) WHERE id(__end__) = id(endNode(__rel__)) " +
"CREATE (__start__)<-[__newrel__:`%s`]-(__end__) " +
"SET __newrel__ = properties(__rel__) " +
"DELETE __rel__", queryStart(), type);
return new SqlStatement[]{new RawParameterizedSqlStatement(cypher, type)};
}

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}

public String getFragment() {
return fragment;
}

public void setFragment(String fragment) {
this.fragment = fragment;
}

public String getOutputVariable() {
return outputVariable;
}

public void setOutputVariable(String outputVariable) {
this.outputVariable = outputVariable;
}

public Boolean getEnableBatchImport() {
return enableBatchImport;
}

public void setEnableBatchImport(Boolean enableBatchImport) {
this.enableBatchImport = enableBatchImport;
}

public Long getBatchSize() {
return batchSize;
}

public void setBatchSize(Long batchSize) {
this.batchSize = batchSize;
}

private String queryStart() {
if (fragment != null) {
return String.format("MATCH %s WITH %s AS __rel__ WHERE type(__rel__) = $1", fragment, outputVariable);
}
return String.format("MATCH ()-[__rel__:`%s`]->()", type);
}
}
5 changes: 3 additions & 2 deletions src/main/resources/META-INF/services/liquibase.change.Change
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
liquibase.ext.neo4j.change.CypherChange
liquibase.ext.neo4j.change.MergeNodesChange
liquibase.ext.neo4j.change.LoadGraphDataChange
liquibase.ext.neo4j.change.ExtractPropertyChange
liquibase.ext.neo4j.change.InsertNodeChange
liquibase.ext.neo4j.change.InvertDirectionChange
liquibase.ext.neo4j.change.LoadGraphDataChange
liquibase.ext.neo4j.change.MergeNodesChange
liquibase.ext.neo4j.change.RenameLabelChange
liquibase.ext.neo4j.change.RenameTypeChange
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,13 @@
<xsd:attribute type="xsd:boolean" name="enableBatchImport" />
<xsd:attribute type="xsd:int" name="batchSize" />
</xsd:complexType>

<xsd:element name="invertDirection" type="invertDirectionType" />
<xsd:complexType name="invertDirectionType">
<xsd:attribute type="xsd:string" name="type" use="required" />
<xsd:attribute type="xsd:string" name="fragment" />
<xsd:attribute type="xsd:string" name="outputVariable" />
<xsd:attribute type="xsd:boolean" name="enableBatchImport" />
<xsd:attribute type="xsd:int" name="batchSize" />
</xsd:complexType>
</xsd:schema>
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package liquibase.ext.neo4j.change

import liquibase.changelog.ChangeSet
import liquibase.database.core.MySQLDatabase
import liquibase.ext.neo4j.database.Neo4jDatabase
import spock.lang.Specification

class InvertDirectionChangeTest extends Specification {

def "supports only Neo4j targets"() {
expect:
new InvertDirectionChange().supports(database) == result

where:
database | result
new Neo4jDatabase() | true
null | false
new MySQLDatabase() | false
}

def "rejects invalid configuration"() {
given:
def renameLabelChange = new InvertDirectionChange()
renameLabelChange.type = type
renameLabelChange.enableBatchImport = enableBatchImport
renameLabelChange.batchSize = batchSize
renameLabelChange.fragment = fragment
renameLabelChange.outputVariable = outputVariable
def changeSet = Mock(ChangeSet)
changeSet.runInTransaction >> runInTx
renameLabelChange.setChangeSet(changeSet)
def database = Mock(Neo4jDatabase)
database.supportsCallInTransactions() >> withCIT

expect:
renameLabelChange.validate(database).getErrorMessages() == [error]

where:
runInTx | withCIT | type | enableBatchImport | batchSize | fragment | outputVariable | error
false | true | null | true | 1000L | "()-[r]->()" | "r" | "missing type"
false | true | "" | true | 1000L | "()-[r]->()" | "r" | "missing type"
false | true | "SOME_TYPE" | true | 1000L | "()-[r]->()" | null | "both fragment and outputVariable must be set (only fragment is currently set), or both must be unset"
false | false | "SOME_TYPE" | true | 1000L | null | "r" | "both fragment and outputVariable must be set (only outputVariable is currently set), or both must be unset"
false | false | "SOME_TYPE" | true | 1000L | "()-[__rel__]->()" | "__rel__" | "outputVariable __rel__ clashes with the reserved variable name: __rel__. outputVariable must be renamed and fragment accordingly updated"
false | true | "SOME_TYPE" | true | -1L | null | null | "batch size, if set, must be strictly positive"
false | false | "SOME_TYPE" | true | -1L | null | null | "batch size, if set, must be strictly positive"
true | false | "SOME_TYPE" | false | 1000L | null | null | "batch size must be set only if enableBatchImport is set to true"
true | false | "SOME_TYPE" | true | 1000L | null | null | "enableBatchImport can be true only if the enclosing change set's runInTransaction attribute is set to false"
}
}
Loading

0 comments on commit 20b3178

Please sign in to comment.