Skip to content

Commit

Permalink
Add vertex delete with edge (#103)
Browse files Browse the repository at this point in the history
* update version to 3.8.0

* Added vertex with edges deletion

---------

Co-authored-by: Anqi <[email protected]>
  • Loading branch information
StrangerOfDawah and Nicole00 authored May 24, 2024
1 parent 13df954 commit 755c88f
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 7 deletions.
4 changes: 2 additions & 2 deletions connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
<parent>
<artifactId>nebula-flink</artifactId>
<groupId>com.vesoft</groupId>
<version>3.0-SNAPSHOT</version>
<version>3.8.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>nebula-flink-connector</artifactId>

<properties>
<nebula.version>3.0-SNAPSHOT</nebula.version>
<nebula.version>3.8.0</nebula.version>
<flink.version>1.14.4</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<compiler.source.version>1.8</compiler.source.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public void executeBatch(Session session) throws IOException {
return;
}
NebulaVertices nebulaVertices = new NebulaVertices(executionOptions.getLabel(),
executionOptions.getFields(), nebulaVertexList, executionOptions.getPolicy());
executionOptions.getFields(), nebulaVertexList, executionOptions.getPolicy(),
executionOptions.isDeleteExecutedWithEdges());
// generate the write ngql statement
String statement = null;
switch (executionOptions.getWriteMode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class VertexExecutionOptions extends ExecutionOptions {
*/
private int idIndex;

private boolean isDeleteExecutedWithEdges = false;

public VertexExecutionOptions(String graphSpace,
String executeStatement,
List<String> fields,
Expand All @@ -44,6 +46,7 @@ public VertexExecutionOptions(String graphSpace,
PolicyEnum policy,
WriteModeEnum mode,
String tag,
boolean isDeleteExecutedWithEdges,
int idIndex,
int batchIntervalMs,
FailureHandlerEnum failureHandler,
Expand All @@ -54,6 +57,7 @@ public VertexExecutionOptions(String graphSpace,
failureHandler, maxRetries, retryDelayMs);
this.tag = tag;
this.idIndex = idIndex;
this.isDeleteExecutedWithEdges = isDeleteExecutedWithEdges;
}

public int getIdIndex() {
Expand All @@ -65,6 +69,10 @@ public String getLabel() {
return tag;
}

public boolean isDeleteExecutedWithEdges() {
return isDeleteExecutedWithEdges;
}

@Override
public DataTypeEnum getDataType() {
return DataTypeEnum.VERTEX;
Expand All @@ -78,6 +86,7 @@ public ExecutionOptionBuilder toBuilder() {
.setFields(this.getFields())
.setPositions(this.getPositions())
.setNoColumn(this.isNoColumn())
.setDeleteExecutedWithEdges(this.isDeleteExecutedWithEdges())
.setLimit(this.getLimit())
.setStartTime(this.getStartTime())
.setEndTime(this.getEndTime())
Expand All @@ -99,6 +108,7 @@ public static class ExecutionOptionBuilder {
private List<String> fields;
private List<Integer> positions;
private boolean noColumn = false;
private boolean isDeleteExecutedWithEdges = false;
private int limit = DEFAULT_SCAN_LIMIT;
private long startTime = 0;
private long endTime = Long.MAX_VALUE;
Expand Down Expand Up @@ -144,6 +154,13 @@ public ExecutionOptionBuilder setNoColumn(boolean noColumn) {
return this;
}

public ExecutionOptionBuilder setDeleteExecutedWithEdges(
boolean isDeleteExecutedWithEdges
) {
this.isDeleteExecutedWithEdges = isDeleteExecutedWithEdges;
return this;
}

public ExecutionOptionBuilder setLimit(int limit) {
this.limit = limit;
return this;
Expand Down Expand Up @@ -220,7 +237,8 @@ public VertexExecutionOptions build() {
}
return new VertexExecutionOptions(graphSpace, executeStatement, fields,
positions, noColumn, limit, startTime, endTime, batchSize, policy, mode, tag,
idIndex, batchIntervalMs, failureHandler, maxRetries, retryDelayMs);
isDeleteExecutedWithEdges, idIndex, batchIntervalMs,
failureHandler, maxRetries, retryDelayMs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class NebulaConstant {

// template for delete statement
public static String DELETE_VERTEX_TEMPLATE = "DELETE VERTEX %s";
public static String DELETE_VERTEX_TEMPLATE_WITH_EDGE = "DELETE VERTEX %s WITH EDGE";
public static String DELETE_EDGE_TEMPLATE = "DELETE EDGE `%s` %s";
public static String EDGE_ENDPOINT_TEMPLATE = "%s->%s@%d";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.apache.flink.connector.nebula.utils.NebulaConstant.BATCH_INSERT_TEMPLATE;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DELETE_VERTEX_TEMPLATE;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DELETE_VERTEX_TEMPLATE_WITH_EDGE;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.ENDPOINT_TEMPLATE;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.UPDATE_VALUE_TEMPLATE;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.UPDATE_VERTEX_TEMPLATE;
Expand All @@ -23,6 +24,8 @@ public class NebulaVertices implements Serializable {
private List<NebulaVertex> vertices;
private PolicyEnum policy = null;

private boolean isDeleteExecutedWithEdges;

public NebulaVertices(String tagName, List<String> propNames, List<NebulaVertex> vertices,
PolicyEnum policy) {
this.tagName = tagName;
Expand All @@ -31,6 +34,15 @@ public NebulaVertices(String tagName, List<String> propNames, List<NebulaVertex>
this.policy = policy;
}

public NebulaVertices(String tagName, List<String> propNames, List<NebulaVertex> vertices,
PolicyEnum policy, boolean isDeleteExecutedWithEdges) {
this.tagName = tagName;
this.propNames = propNames;
this.vertices = vertices;
this.policy = policy;
this.isDeleteExecutedWithEdges = isDeleteExecutedWithEdges;
}

public String getPropNames() {
List<String> escapePropNames = new ArrayList<>();
for (String propName : propNames) {
Expand Down Expand Up @@ -110,7 +122,10 @@ public String getDeleteStatement() {
String vertexId = getVertexId(vertex);
vertexIds.add(vertexId);
}
return String.format(DELETE_VERTEX_TEMPLATE, String.join(",", vertexIds));
String template = isDeleteExecutedWithEdges
? DELETE_VERTEX_TEMPLATE_WITH_EDGE
: DELETE_VERTEX_TEMPLATE;
return String.format(template, String.join(",", vertexIds));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,26 @@ public void testGetDeleteStatementWithPolicy() {
assert (vertexStatement.equals(expectStatement));
}

public void testGetDeleteStatementWithEdges() {
vertices.add(new NebulaVertex("\"vid1\"", props1));
vertices.add(new NebulaVertex("\"vid2\"", props2));

NebulaVertices nebulaVertices = new NebulaVertices(
tagName, propNames, vertices, null, true
);
String vertexStatement = nebulaVertices.getDeleteStatement();
String expectStatement = "DELETE VERTEX \"vid1\",\"vid2\" WITH EDGE";
assert (vertexStatement.equals(expectStatement));
}

public void testGetDeleteStatementWithPolicyAndEdges() {
vertices.add(new NebulaVertex("vid1", props1));
vertices.add(new NebulaVertex("vid2", props2));

NebulaVertices nebulaVertices = new NebulaVertices(tagName, propNames, vertices,
PolicyEnum.HASH, true);
String vertexStatement = nebulaVertices.getDeleteStatement();
String expectStatement = "DELETE VERTEX HASH(\"vid1\"),HASH(\"vid2\") WITH EDGE";
assert (vertexStatement.equals(expectStatement));
}
}
2 changes: 1 addition & 1 deletion example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>nebula-flink</artifactId>
<groupId>com.vesoft</groupId>
<version>3.0-SNAPSHOT</version>
<version>3.8.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<groupId>com.vesoft</groupId>
<artifactId>nebula-flink</artifactId>
<packaging>pom</packaging>
<version>3.0-SNAPSHOT</version>
<version>3.8.0</version>

<modules>
<module>connector</module>
Expand Down

0 comments on commit 755c88f

Please sign in to comment.