diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/catalog/NebulaCatalog.java b/connector/src/main/java/org.apache.flink/connector/nebula/catalog/NebulaCatalog.java index df5819d..f06449e 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/catalog/NebulaCatalog.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/catalog/NebulaCatalog.java @@ -98,7 +98,7 @@ public void open() throws CatalogException { try { this.metaClient = metaConnectionProvider.getMetaClient(); } catch (UnknownHostException | ClientServerIncompatibleException e) { - LOG.error("nebula get meta client error, ", e); + LOG.error("nebula get meta client error", e); throw new CatalogException("nebula get meta client error.", e); } @@ -108,8 +108,8 @@ public void open() throws CatalogException { graphConnectionProvider.getPassword(), true); } catch (NotValidConnectionException | IOErrorException | AuthFailedException | ClientServerIncompatibleException | UnknownHostException e) { - LOG.error("failed to get graph session, ", e); - throw new CatalogException("get graph session error, ", e); + LOG.error("failed to get graph session", e); + throw new CatalogException("get graph session error.", e); } } @@ -137,7 +137,7 @@ public List listDatabases() throws CatalogException { spaceNames.add(new String(space.getName())); } } catch (TException | ExecuteFailedException | ClientServerIncompatibleException e) { - LOG.error("failed to connect meta service vis {} ", address, e); + LOG.error("failed to connect meta service via " + address, e); throw new CatalogException("nebula meta service connect failed.", e); } return spaceNames; @@ -152,7 +152,7 @@ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistE props.put("spaceId", String.valueOf(metaClient.getSpace(databaseName).getSpace_id())); } catch (TException | ExecuteFailedException e) { - LOG.error("get spaceId error, ", e); + LOG.error("get spaceId error", e); } return new CatalogDatabaseImpl(props, databaseName); } else { @@ -186,13 +186,13 @@ public void createDatabase(String dataBaseName, ) ); if (!newProperties.containsKey(NebulaConstant.CREATE_VID_TYPE)) { - LOG.error("failed to create graph space {}, missing VID type param", properties); + LOG.error("failed to create graph space {}, missing VID type", properties); throw new CatalogException("nebula create graph space failed, missing VID type."); } String vidType = newProperties.get(NebulaConstant.CREATE_VID_TYPE); if (!NebulaUtils.checkValidVidType(vidType)) { - LOG.error("VID type not satisfy {}", vidType); - throw new CatalogException("nebula graph dont support VID type."); + LOG.error("invalid VID type: {}", vidType); + throw new CatalogException("nebula does not support the specified VID type."); } NebulaSpace space = new NebulaSpace( dataBaseName,catalogDatabase.getComment(), newProperties); @@ -210,7 +210,7 @@ public void createDatabase(String dataBaseName, LOG.debug("create space success."); } else { LOG.error("create space failed: {}", execResult.getErrorMessage()); - throw new CatalogException("create space failed, " + execResult.getErrorMessage()); + throw new CatalogException("create space failed: " + execResult.getErrorMessage()); } } @@ -227,7 +227,7 @@ public boolean tableExists(ObjectPath tablePath) throws CatalogException { try { return (listTables(graphSpace).contains(table)); } catch (DatabaseNotExistException e) { - throw new CatalogException("failed to call tableExists function, ", e); + throw new CatalogException("failed to call tableExists function.", e); } } @@ -248,7 +248,7 @@ public List listTables(String graphSpace) throws DatabaseNotExistExcepti try { metaClient.connect(); } catch (TException | ClientServerIncompatibleException e) { - LOG.error("failed to connect meta service vis {} ", address, e); + LOG.error("failed to connect meta service via " + address, e); throw new CatalogException("nebula meta service connect failed.", e); } List tables = new ArrayList<>(); @@ -260,7 +260,7 @@ public List listTables(String graphSpace) throws DatabaseNotExistExcepti tables.add("EDGE" + NebulaConstant.POINT + new String(edge.edge_name)); } } catch (TException | ExecuteFailedException e) { - LOG.error("get tags or edges error,", e); + LOG.error("get tags or edges error", e); } return tables; } @@ -284,7 +284,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep try { metaClient.connect(); } catch (TException | ClientServerIncompatibleException e) { - LOG.error("failed to connect meta service vis {} ", address, e); + LOG.error("failed to connect meta service via " + address, e); throw new CatalogException("nebula meta service connect failed.", e); } @@ -296,7 +296,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep schema = metaClient.getEdge(graphSpace, label); } } catch (TException | ExecuteFailedException e) { - LOG.error("get tag or edge schema error, ", e); + LOG.error("get tag or edge schema error", e); return null; } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchExecutor.java index d624471..2b83ff6 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchExecutor.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchExecutor.java @@ -5,21 +5,72 @@ package org.apache.flink.connector.nebula.sink; +import com.vesoft.nebula.ErrorCode; +import com.vesoft.nebula.client.graph.data.ResultSet; +import com.vesoft.nebula.client.graph.exception.IOErrorException; import com.vesoft.nebula.client.graph.net.Session; +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public interface NebulaBatchExecutor { +public abstract class NebulaBatchExecutor { + + public static class ExecutionException extends IOException { + private final String statement; + private final String errorMessage; + private final int errorCode; + + public ExecutionException(String statement, String errorMessage, int errorCode) { + this.statement = statement; + this.errorMessage = errorMessage; + this.errorCode = errorCode; + } + + @Override + public String getMessage() { + return String.format("failed to execute statement %s: %s [%s]", + statement, errorMessage, errorCode); + } + + public boolean isNonRecoverableError() { + return this.errorCode == ErrorCode.E_SEMANTIC_ERROR.getValue() + || this.errorCode == ErrorCode.E_SYNTAX_ERROR.getValue(); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(NebulaBatchExecutor.class); /** * put record into buffer * * @param record represent vertex or edge */ - void addToBatch(T record); + public abstract void addToBatch(T record); /** * execute the statement * * @param session graph session */ - String executeBatch(Session session); + public abstract void executeBatch(Session session) throws IOException; + + public abstract void clearBatch(); + + public abstract boolean isBatchEmpty(); + + protected static void executeStatement(Session session, String statement) throws IOException { + LOG.debug("write statement: {}", statement); + ResultSet execResult; + try { + execResult = session.execute(statement); + } catch (IOErrorException e) { + throw new IOException(e); + } + if (execResult.isSucceeded()) { + LOG.debug("write success"); + } else { + throw new ExecutionException( + statement, execResult.getErrorMessage(), execResult.getErrorCode()); + } + } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java index 2d5fd69..7deda3f 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java @@ -17,18 +17,16 @@ import java.io.Flushable; import java.io.IOException; import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider; import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider; import org.apache.flink.connector.nebula.statement.ExecutionOptions; +import org.apache.flink.connector.nebula.utils.FailureHandlerEnum; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,14 +40,14 @@ public abstract class NebulaBatchOutputFormat nebulaBatchExecutor; - private volatile AtomicLong numPendingRow; + private transient long numPendingRow; private NebulaPool nebulaPool; private Session session; - private final List errorBuffer = new ArrayList<>(); private transient ScheduledExecutorService scheduler; private transient ScheduledFuture scheduledFuture; private transient volatile boolean closed = false; + private transient volatile Exception commitException; public NebulaBatchOutputFormat( NebulaGraphConnectionProvider graphProvider, @@ -71,33 +69,20 @@ public void configure(Configuration configuration) { public void open(int i, int i1) throws IOException { try { nebulaPool = graphProvider.getNebulaPool(); - session = nebulaPool.getSession(graphProvider.getUserName(), - graphProvider.getPassword(), true); - } catch (UnknownHostException | NotValidConnectionException | AuthFailedException - | ClientServerIncompatibleException | IOErrorException e) { - LOG.error("failed to get graph session, ", e); - throw new IOException("get graph session error, ", e); - } - ResultSet resultSet; - try { - resultSet = session.execute("USE " + executionOptions.getGraphSpace()); - } catch (IOErrorException e) { - LOG.error("switch space error, ", e); - throw new IOException("switch space error,", e); - } - if (!resultSet.isSucceeded()) { - LOG.error("switch space failed, {}", resultSet.getErrorMessage()); - throw new RuntimeException("switch space failed, " + resultSet.getErrorMessage()); + } catch (UnknownHostException e) { + LOG.error("failed to create connection pool", e); + throw new IOException("connection pool creation error", e); } + renewSession(); try { metaClient = metaProvider.getMetaClient(); } catch (TException | ClientServerIncompatibleException e) { - LOG.error("failed to get meta client, ", e); - throw new IOException("get metaClient error, ", e); + LOG.error("failed to get meta client", e); + throw new IOException("get meta client error", e); } - numPendingRow = new AtomicLong(0); + numPendingRow = 0; nebulaBatchExecutor = createNebulaBatchExecutor(); // start the schedule task: submit the buffer records every batchInterval. // If batchIntervalMs is 0, do not start the scheduler task. @@ -106,8 +91,12 @@ public void open(int i, int i1) throws IOException { "nebula-write-output-format")); this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> { synchronized (NebulaBatchOutputFormat.this) { - if (!closed) { - commit(); + if (!closed && commitException == null) { + try { + commit(); + } catch (Exception e) { + commitException = e; + } } } }, executionOptions.getBatchIntervalMs(), @@ -116,16 +105,51 @@ public void open(int i, int i1) throws IOException { } } + private void checkCommitException() { + if (commitException != null) { + throw new RuntimeException("commit records failed", commitException); + } + } + + private void renewSession() throws IOException { + if (session != null) { + session.release(); + session = null; + } + try { + session = nebulaPool.getSession(graphProvider.getUserName(), + graphProvider.getPassword(), true); + } catch (NotValidConnectionException | AuthFailedException + | ClientServerIncompatibleException | IOErrorException e) { + LOG.error("failed to get graph session", e); + throw new IOException("get graph session error", e); + } + ResultSet resultSet; + try { + resultSet = session.execute("USE " + executionOptions.getGraphSpace()); + } catch (IOErrorException e) { + LOG.error("switch space error", e); + throw new IOException("switch space error", e); + } + if (!resultSet.isSucceeded()) { + LOG.error("switch space failed: " + resultSet.getErrorMessage()); + throw new IOException("switch space failed: " + resultSet.getErrorMessage()); + } + } + protected abstract NebulaBatchExecutor createNebulaBatchExecutor(); /** * write one record to buffer */ @Override - public final synchronized void writeRecord(T row) { + public final synchronized void writeRecord(T row) throws IOException { + checkCommitException(); nebulaBatchExecutor.addToBatch(row); + numPendingRow++; - if (numPendingRow.incrementAndGet() >= executionOptions.getBatchSize()) { + if (executionOptions.getBatchSize() > 0 + && numPendingRow >= executionOptions.getBatchSize()) { commit(); } } @@ -133,32 +157,57 @@ public final synchronized void writeRecord(T row) { /** * commit batch insert statements */ - private synchronized void commit() { - String errorExec = nebulaBatchExecutor.executeBatch(session); - if (errorExec != null) { - errorBuffer.add(errorExec); + private synchronized void commit() throws IOException { + int maxRetries = executionOptions.getMaxRetries(); + int retryDelayMs = executionOptions.getRetryDelayMs(); + boolean failOnError = executionOptions.getFailureHandler().equals(FailureHandlerEnum.FAIL); + + // execute the batch at most `maxRetries + 1` times + for (int i = 0; i <= maxRetries; i++) { + try { + nebulaBatchExecutor.executeBatch(session); + numPendingRow = 0; + break; + } catch (Exception e) { + LOG.warn(String.format("write data error (attempt %d)", i), e); + boolean nonRecoverable = (e instanceof NebulaBatchExecutor.ExecutionException) + && ((NebulaBatchExecutor.ExecutionException) e).isNonRecoverableError(); + if (i >= maxRetries || nonRecoverable) { + // clear the batch on failure when we do not want more retries + nebulaBatchExecutor.clearBatch(); + numPendingRow = 0; + if (failOnError) { + throw e; + } + renewSession(); + break; // break the retry loop when we do not want more retries + } else { + try { + Thread.sleep(retryDelayMs); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IOException("interrupted", ex); + } + renewSession(); + } + } } - long pendingRow = numPendingRow.get(); - numPendingRow.compareAndSet(pendingRow, 0); } /** * commit the batch write operator before release connection */ @Override - public final synchronized void close() { + public final synchronized void close() throws IOException { if (!closed) { closed = true; if (scheduledFuture != null) { scheduledFuture.cancel(false); scheduler.shutdown(); } - if (numPendingRow != null && numPendingRow.get() > 0) { + if (numPendingRow > 0) { commit(); } - if (!errorBuffer.isEmpty()) { - LOG.error("insert error statements: {}", errorBuffer); - } if (session != null) { session.release(); } @@ -176,7 +225,8 @@ public final synchronized void close() { */ @Override public synchronized void flush() throws IOException { - while (numPendingRow.get() != 0) { + checkCommitException(); + while (numPendingRow > 0) { commit(); } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java index e988aed..0a9c613 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java @@ -5,8 +5,8 @@ package org.apache.flink.connector.nebula.sink; -import com.vesoft.nebula.client.graph.data.ResultSet; import com.vesoft.nebula.client.graph.net.Session; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -18,7 +18,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class NebulaEdgeBatchExecutor implements NebulaBatchExecutor { +public class NebulaEdgeBatchExecutor extends NebulaBatchExecutor { private static final Logger LOG = LoggerFactory.getLogger(NebulaEdgeBatchExecutor.class); private final EdgeExecutionOptions executionOptions; private final List nebulaEdgeList; @@ -44,9 +44,19 @@ public void addToBatch(Row record) { } @Override - public String executeBatch(Session session) { - if (nebulaEdgeList.size() == 0) { - return null; + public void clearBatch() { + nebulaEdgeList.clear(); + } + + @Override + public boolean isBatchEmpty() { + return nebulaEdgeList.isEmpty(); + } + + @Override + public void executeBatch(Session session) throws IOException { + if (isBatchEmpty()) { + return; } NebulaEdges nebulaEdges = new NebulaEdges(executionOptions.getLabel(), executionOptions.getFields(), nebulaEdgeList, executionOptions.getPolicy(), @@ -66,26 +76,7 @@ public String executeBatch(Session session) { default: throw new IllegalArgumentException("write mode is not supported"); } - LOG.debug("write statement={}", statement); - - // execute ngql statement - ResultSet execResult = null; - try { - execResult = session.execute(statement); - } catch (Exception e) { - LOG.error("write data error, ", e); - nebulaEdgeList.clear(); - return statement; - } - - if (execResult.isSucceeded()) { - LOG.debug("write success"); - } else { - LOG.error("write data failed: {}", execResult.getErrorMessage()); - nebulaEdgeList.clear(); - return statement; - } - nebulaEdgeList.clear(); - return null; + executeStatement(session, statement); + clearBatch(); } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaSinkFunction.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaSinkFunction.java index 067a026..0b53c2f 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaSinkFunction.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaSinkFunction.java @@ -41,12 +41,12 @@ public void open(Configuration parameters) throws Exception { } @Override - public void close() { + public void close() throws Exception { outputFormat.close(); } @Override - public void invoke(T value, Context context) { + public void invoke(T value, Context context) throws Exception { checkErrorAndRethrow(); outputFormat.writeRecord(value); } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaTableBufferReducedExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaTableBufferReducedExecutor.java index 0b94e0a..abfff13 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaTableBufferReducedExecutor.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaTableBufferReducedExecutor.java @@ -1,18 +1,16 @@ package org.apache.flink.connector.nebula.sink; import com.vesoft.nebula.client.graph.net.Session; +import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.Objects; import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter; import org.apache.flink.table.data.RowData; import org.apache.flink.types.Row; -public class NebulaTableBufferReducedExecutor implements NebulaBatchExecutor { +public class NebulaTableBufferReducedExecutor extends NebulaBatchExecutor { private final DataStructureConverter dataStructureConverter; private final Function keyExtractor; private final NebulaBatchExecutor insertExecutor; @@ -50,7 +48,20 @@ public void addToBatch(RowData record) { } @Override - public String executeBatch(Session session) { + public void clearBatch() { + reduceBuffer.clear(); + } + + @Override + public boolean isBatchEmpty() { + return reduceBuffer.isEmpty(); + } + + @Override + public void executeBatch(Session session) throws IOException { + if (isBatchEmpty()) { + return; + } for (Tuple2 value : reduceBuffer.values()) { boolean isUpsert = value.f0; Row row = value.f1; @@ -60,11 +71,13 @@ public String executeBatch(Session session) { deleteExecutor.addToBatch(row); } } - String insertErrorStatement = insertExecutor.executeBatch(session); - String deleteErrorStatement = deleteExecutor.executeBatch(session); - reduceBuffer.clear(); - String errorStatements = Stream.of(insertErrorStatement, deleteErrorStatement) - .filter(Objects::nonNull).collect(Collectors.joining("; ")); - return errorStatements.isEmpty() ? null : errorStatements; + try { + insertExecutor.executeBatch(session); + deleteExecutor.executeBatch(session); + } finally { + insertExecutor.clearBatch(); + deleteExecutor.clearBatch(); + } + clearBatch(); } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java index dc045cb..1e3de01 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java @@ -5,12 +5,11 @@ package org.apache.flink.connector.nebula.sink; -import com.vesoft.nebula.client.graph.data.ResultSet; import com.vesoft.nebula.client.graph.net.Session; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.apache.flink.connector.nebula.statement.ExecutionOptions; import org.apache.flink.connector.nebula.statement.VertexExecutionOptions; import org.apache.flink.connector.nebula.utils.NebulaVertex; import org.apache.flink.connector.nebula.utils.NebulaVertices; @@ -19,7 +18,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class NebulaVertexBatchExecutor implements NebulaBatchExecutor { +public class NebulaVertexBatchExecutor extends NebulaBatchExecutor { private static final Logger LOG = LoggerFactory.getLogger(NebulaVertexBatchExecutor.class); private final VertexExecutionOptions executionOptions; private final List nebulaVertexList; @@ -48,9 +47,19 @@ public void addToBatch(Row record) { } @Override - public String executeBatch(Session session) { - if (nebulaVertexList.size() == 0) { - return null; + public void clearBatch() { + nebulaVertexList.clear(); + } + + @Override + public boolean isBatchEmpty() { + return nebulaVertexList.isEmpty(); + } + + @Override + public void executeBatch(Session session) throws IOException { + if (isBatchEmpty()) { + return; } NebulaVertices nebulaVertices = new NebulaVertices(executionOptions.getLabel(), executionOptions.getFields(), nebulaVertexList, executionOptions.getPolicy()); @@ -69,26 +78,7 @@ public String executeBatch(Session session) { default: throw new IllegalArgumentException("write mode is not supported"); } - LOG.debug("write statement={}", statement); - - // execute ngql statement - ResultSet execResult = null; - try { - execResult = session.execute(statement); - } catch (Exception e) { - LOG.error("write data error, ", e); - nebulaVertexList.clear(); - return statement; - } - - if (execResult.isSucceeded()) { - LOG.debug("write success"); - } else { - LOG.error("write data failed: {}", execResult.getErrorMessage()); - nebulaVertexList.clear(); - return statement; - } - nebulaVertexList.clear(); - return null; + executeStatement(session, statement); + clearBatch(); } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/source/NebulaInputFormat.java b/connector/src/main/java/org.apache.flink/connector/nebula/source/NebulaInputFormat.java index 913d4a1..35e1c41 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/source/NebulaInputFormat.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/source/NebulaInputFormat.java @@ -81,8 +81,8 @@ public void openInputFormat() throws IOException { metaClient = metaConnectionProvider.getMetaClient(); numPart = metaClient.getPartsAlloc(executionOptions.getGraphSpace()).size(); } catch (Exception e) { - LOG.error("connect storage client error, ", e); - throw new IOException("connect storage client error, ", e); + LOG.error("connect storage client error", e); + throw new IOException("connect storage client error", e); } rows = new ArrayList<>(); } @@ -97,8 +97,8 @@ public void closeInputFormat() throws IOException { metaClient.close(); } } catch (Exception e) { - LOG.error("close client error,", e); - throw new IOException("close client error,", e); + LOG.error("close client error", e); + throw new IOException("close client error", e); } } @@ -135,8 +135,8 @@ public void open(InputSplit inputSplit) throws IOException { try { hasNext = nebulaSource.hasNext(); } catch (Exception e) { - LOG.error("scan NebulaGraph error, ", e); - throw new IOException("scan error, ", e); + LOG.error("scan NebulaGraph error", e); + throw new IOException("scan error", e); } } } @@ -157,8 +157,8 @@ public T nextRecord(T reuse) throws IOException { try { hasNext = nebulaSource.hasNext(); } catch (Exception e) { - LOG.error("scan NebulaGraph error, ", e); - throw new IOException("scan NebulaGraph error, ", e); + LOG.error("scan NebulaGraph error", e); + throw new IOException("scan NebulaGraph error", e); } scannedRows++; return nebulaConverter.convert(row); diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/statement/EdgeExecutionOptions.java b/connector/src/main/java/org.apache.flink/connector/nebula/statement/EdgeExecutionOptions.java index fcacffc..8b0bc89 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/statement/EdgeExecutionOptions.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/statement/EdgeExecutionOptions.java @@ -6,6 +6,8 @@ package org.apache.flink.connector.nebula.statement; import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_BATCH_INTERVAL_MS; +import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_EXECUTION_RETRY; +import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_RETRY_DELAY_MS; import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_ROW_INFO_INDEX; import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_SCAN_LIMIT; import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_WRITE_BATCH_SIZE; @@ -14,6 +16,7 @@ import java.util.Objects; import java.util.Optional; import org.apache.flink.connector.nebula.utils.DataTypeEnum; +import org.apache.flink.connector.nebula.utils.FailureHandlerEnum; import org.apache.flink.connector.nebula.utils.PolicyEnum; import org.apache.flink.connector.nebula.utils.WriteModeEnum; @@ -40,13 +43,28 @@ public class EdgeExecutionOptions extends ExecutionOptions { private int rankIndex; - private EdgeExecutionOptions(String graphSpace, String executeStatement, List fields, - List positions, boolean noColumn, int limit, - long startTime, long endTime, int batch, PolicyEnum policy, - WriteModeEnum mode, String edge, int srcIndex, int dstIndex, - int rankIndex, int batchIntervalMs) { + private EdgeExecutionOptions(String graphSpace, + String executeStatement, + List fields, + List positions, + boolean noColumn, + int limit, + long startTime, + long endTime, + int batch, + PolicyEnum policy, + WriteModeEnum mode, + String edge, + int srcIndex, + int dstIndex, + int rankIndex, + int batchIntervalMs, + FailureHandlerEnum failureHandler, + int maxRetries, + int retryDelayMs) { super(graphSpace, executeStatement, fields, positions, noColumn, limit, startTime, - endTime, batch, policy, mode, batchIntervalMs); + endTime, batch, policy, mode, batchIntervalMs, + failureHandler, maxRetries, retryDelayMs); this.edge = edge; this.srcIndex = srcIndex; this.dstIndex = dstIndex; @@ -97,7 +115,10 @@ public ExecutionOptionBuilder toBuilder() { .setDstIndex(this.getDstIndex()) .setRankIndex(this.getRankIndex()) .setWriteMode(this.getWriteMode()) - .setBatchIntervalMs(this.getBatchIntervalMs()); + .setBatchIntervalMs(this.getBatchIntervalMs()) + .setFailureHandler(this.getFailureHandler()) + .setMaxRetries(this.getMaxRetries()) + .setRetryDelayMs(this.getRetryDelayMs()); } public static class ExecutionOptionBuilder { @@ -117,6 +138,9 @@ public static class ExecutionOptionBuilder { private int srcIndex = DEFAULT_ROW_INFO_INDEX; private int dstIndex = DEFAULT_ROW_INFO_INDEX; private int rankIndex = DEFAULT_ROW_INFO_INDEX; + private FailureHandlerEnum failureHandler = FailureHandlerEnum.IGNORE; + private int maxRetries = DEFAULT_EXECUTION_RETRY; + private int retryDelayMs = DEFAULT_RETRY_DELAY_MS; public ExecutionOptionBuilder setGraphSpace(String graphSpace) { this.graphSpace = graphSpace; @@ -205,6 +229,21 @@ public ExecutionOptionBuilder setBatchIntervalMs(int batchIntervalMs) { return this; } + public ExecutionOptionBuilder setFailureHandler(FailureHandlerEnum failureHandler) { + this.failureHandler = failureHandler; + return this; + } + + public ExecutionOptionBuilder setMaxRetries(int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + public ExecutionOptionBuilder setRetryDelayMs(int retryDelayMs) { + this.retryDelayMs = retryDelayMs; + return this; + } + @Deprecated public EdgeExecutionOptions builder() { return build(); @@ -219,7 +258,7 @@ public EdgeExecutionOptions build() { } return new EdgeExecutionOptions(graphSpace, executeStatement, fields, positions, noColumn, limit, startTime, endTime, batchSize, policy, mode, edge, srcIndex, - dstIndex, rankIndex, batchIntervalMs); + dstIndex, rankIndex, batchIntervalMs, failureHandler, maxRetries, retryDelayMs); } } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/statement/ExecutionOptions.java b/connector/src/main/java/org.apache.flink/connector/nebula/statement/ExecutionOptions.java index 10d8806..79e8c12 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/statement/ExecutionOptions.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/statement/ExecutionOptions.java @@ -8,6 +8,7 @@ import java.io.Serializable; import java.util.List; import org.apache.flink.connector.nebula.utils.DataTypeEnum; +import org.apache.flink.connector.nebula.utils.FailureHandlerEnum; import org.apache.flink.connector.nebula.utils.PolicyEnum; import org.apache.flink.connector.nebula.utils.WriteModeEnum; import org.apache.flink.types.Row; @@ -136,6 +137,20 @@ public abstract class ExecutionOptions implements Serializable { */ private int batchIntervalMs; + /** + * failure handler + */ + private FailureHandlerEnum failureHandler; + + /** + * maximum number of retries + */ + private int maxRetries; + + /** + * retry delay + */ + private int retryDelayMs; protected ExecutionOptions(String graphSpace, String executeStatement, @@ -148,7 +163,10 @@ protected ExecutionOptions(String graphSpace, int batchSize, PolicyEnum policy, WriteModeEnum writeMode, - int batchIntervalMs) { + int batchIntervalMs, + FailureHandlerEnum failureHandler, + int maxRetries, + int retryDelayMs) { this.graphSpace = graphSpace; this.executeStatement = executeStatement; this.fields = fields; @@ -161,6 +179,9 @@ protected ExecutionOptions(String graphSpace, this.policy = policy; this.writeMode = writeMode; this.batchIntervalMs = batchIntervalMs; + this.failureHandler = failureHandler; + this.maxRetries = maxRetries; + this.retryDelayMs = retryDelayMs; } public String getGraphSpace() { @@ -220,6 +241,18 @@ public int getBatchIntervalMs() { return batchIntervalMs; } + public FailureHandlerEnum getFailureHandler() { + return failureHandler; + } + + public int getMaxRetries() { + return maxRetries; + } + + public int getRetryDelayMs() { + return retryDelayMs; + } + @Override public String toString() { return "ExecutionOptions{" @@ -235,6 +268,9 @@ public String toString() { + ", policy=" + policy + ", mode=" + writeMode + ", batchIntervalMs=" + batchIntervalMs + + ", failureHandler=" + failureHandler + + ", maxRetries=" + maxRetries + + ", retryDelayMs=" + retryDelayMs + '}'; } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/statement/VertexExecutionOptions.java b/connector/src/main/java/org.apache.flink/connector/nebula/statement/VertexExecutionOptions.java index d124ad2..5e7057b 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/statement/VertexExecutionOptions.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/statement/VertexExecutionOptions.java @@ -6,6 +6,8 @@ package org.apache.flink.connector.nebula.statement; import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_BATCH_INTERVAL_MS; +import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_EXECUTION_RETRY; +import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_RETRY_DELAY_MS; import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_ROW_INFO_INDEX; import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_SCAN_LIMIT; import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_WRITE_BATCH_SIZE; @@ -14,6 +16,7 @@ import java.util.Objects; import java.util.Optional; import org.apache.flink.connector.nebula.utils.DataTypeEnum; +import org.apache.flink.connector.nebula.utils.FailureHandlerEnum; import org.apache.flink.connector.nebula.utils.PolicyEnum; import org.apache.flink.connector.nebula.utils.WriteModeEnum; @@ -42,9 +45,13 @@ public VertexExecutionOptions(String graphSpace, WriteModeEnum mode, String tag, int idIndex, - int batchIntervalMs) { + int batchIntervalMs, + FailureHandlerEnum failureHandler, + int maxRetries, + int retryDelayMs) { super(graphSpace, executeStatement, fields, positions, noColumn, limit, startTime, - endTime, batch, policy, mode, batchIntervalMs); + endTime, batch, policy, mode, batchIntervalMs, + failureHandler, maxRetries, retryDelayMs); this.tag = tag; this.idIndex = idIndex; } @@ -79,7 +86,10 @@ public ExecutionOptionBuilder toBuilder() { .map(Objects::toString).orElse(null)) .setIdIndex(this.getIdIndex()) .setWriteMode(this.getWriteMode()) - .setBatchIntervalMs(this.getBatchIntervalMs()); + .setBatchIntervalMs(this.getBatchIntervalMs()) + .setFailureHandler(this.getFailureHandler()) + .setMaxRetries(this.getMaxRetries()) + .setRetryDelayMs(this.getRetryDelayMs()); } public static class ExecutionOptionBuilder { @@ -97,6 +107,9 @@ public static class ExecutionOptionBuilder { private PolicyEnum policy = null; private WriteModeEnum mode = WriteModeEnum.INSERT; private int idIndex = DEFAULT_ROW_INFO_INDEX; + private FailureHandlerEnum failureHandler = FailureHandlerEnum.IGNORE; + private int maxRetries = DEFAULT_EXECUTION_RETRY; + private int retryDelayMs = DEFAULT_RETRY_DELAY_MS; public ExecutionOptionBuilder setGraphSpace(String graphSpace) { this.graphSpace = graphSpace; @@ -178,6 +191,21 @@ public ExecutionOptionBuilder setBatchIntervalMs(int batchIntervalMs) { return this; } + public ExecutionOptionBuilder setFailureHandler(FailureHandlerEnum failureHandler) { + this.failureHandler = failureHandler; + return this; + } + + public ExecutionOptionBuilder setMaxRetries(int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + public ExecutionOptionBuilder setRetryDelayMs(int retryDelayMs) { + this.retryDelayMs = retryDelayMs; + return this; + } + @Deprecated public VertexExecutionOptions builder() { return build(); @@ -192,7 +220,7 @@ public VertexExecutionOptions build() { } return new VertexExecutionOptions(graphSpace, executeStatement, fields, positions, noColumn, limit, startTime, endTime, batchSize, policy, mode, tag, - idIndex, batchIntervalMs); + idIndex, batchIntervalMs, failureHandler, maxRetries, retryDelayMs); } } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableFactory.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableFactory.java index 4a14917..ccb2bfa 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableFactory.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableFactory.java @@ -17,6 +17,7 @@ import org.apache.flink.connector.nebula.statement.ExecutionOptions; import org.apache.flink.connector.nebula.statement.VertexExecutionOptions; import org.apache.flink.connector.nebula.utils.DataTypeEnum; +import org.apache.flink.connector.nebula.utils.FailureHandlerEnum; import org.apache.flink.connector.nebula.utils.NebulaConstant; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.Column; @@ -117,6 +118,24 @@ public class NebulaDynamicTableFactory implements DynamicTableSourceFactory, .noDefaultValue() .withDescription("batch commit interval in milliseconds."); + public static final ConfigOption FAILURE_HANDLER = ConfigOptions + .key("failure-handler") + .enumType(FailureHandlerEnum.class) + .defaultValue(FailureHandlerEnum.IGNORE) + .withDescription("failure handler."); + + public static final ConfigOption MAX_RETRIES = ConfigOptions + .key("max-retries") + .intType() + .defaultValue(NebulaConstant.DEFAULT_EXECUTION_RETRY) + .withDescription("maximum number of retries."); + + public static final ConfigOption RETRY_DELAY_MS = ConfigOptions + .key("retry-delay-ms") + .intType() + .defaultValue(NebulaConstant.DEFAULT_RETRY_DELAY_MS) + .withDescription("retry delay in milliseconds."); + @Override public DynamicTableSink createDynamicTableSink(Context context) { final FactoryUtil.TableFactoryHelper helper = @@ -160,6 +179,7 @@ private NebulaClientOptions getClientOptions(ReadableConfig config) { .setGraphAddress(config.get(GRAPHADDRESS)) .setUsername(config.get(USERNAME)) .setPassword(config.get(PASSWORD)) + .setTimeout(config.get(TIMEOUT)) .build(); } @@ -181,7 +201,10 @@ private ExecutionOptions getExecutionOptions(Context context, ReadableConfig con .setIdIndex(config.get(ID_INDEX)) .setPositions(positions) .setGraphSpace(config.get(GRAPH_SPACE)) - .setTag(labelName); + .setTag(labelName) + .setFailureHandler(config.get(FAILURE_HANDLER)) + .setMaxRetries(config.get(MAX_RETRIES)) + .setRetryDelayMs(config.get(RETRY_DELAY_MS)); config.getOptional(BATCH_SIZE).ifPresent(builder::setBatchSize); config.getOptional(BATCH_INTERVAL_MS).ifPresent(builder::setBatchIntervalMs); return builder.build(); @@ -201,7 +224,10 @@ private ExecutionOptions getExecutionOptions(Context context, ReadableConfig con .setRankIndex(config.get(RANK_ID_INDEX)) .setPositions(positions) .setGraphSpace(config.get(GRAPH_SPACE)) - .setEdge(labelName); + .setEdge(labelName) + .setFailureHandler(config.get(FAILURE_HANDLER)) + .setMaxRetries(config.get(MAX_RETRIES)) + .setRetryDelayMs(config.get(RETRY_DELAY_MS)); config.getOptional(BATCH_SIZE).ifPresent(builder::setBatchSize); config.getOptional(BATCH_INTERVAL_MS).ifPresent(builder::setBatchIntervalMs); return builder.build(); @@ -236,6 +262,9 @@ public Set> optionalOptions() { set.add(RANK_ID_INDEX); set.add(BATCH_SIZE); set.add(BATCH_INTERVAL_MS); + set.add(FAILURE_HANDLER); + set.add(MAX_RETRIES); + set.add(RETRY_DELAY_MS); return set; } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSink.java b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSink.java index 4f31f10..b75501e 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSink.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSink.java @@ -18,20 +18,13 @@ import org.apache.flink.types.RowKind; public class NebulaDynamicTableSink implements DynamicTableSink { - private final String metaAddress; - private final String graphAddress; - - private final String username; - private final String password; + private final NebulaClientOptions nebulaClientOptions; private final ExecutionOptions executionOptions; final DataType producedDataType; public NebulaDynamicTableSink(NebulaClientOptions clientOptions, ExecutionOptions executionOptions, DataType producedDataType) { - this.metaAddress = clientOptions.getRawMetaAddress(); - this.graphAddress = clientOptions.getGraphAddress(); - this.username = clientOptions.getUsername(); - this.password = clientOptions.getPassword(); + this.nebulaClientOptions = clientOptions; this.executionOptions = executionOptions; this.producedDataType = producedDataType; } @@ -50,15 +43,10 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { - NebulaClientOptions builder = new NebulaClientOptions.NebulaClientOptionsBuilder() - .setMetaAddress(metaAddress) - .setGraphAddress(graphAddress) - .setUsername(username) - .setPassword(password) - .build(); - - NebulaGraphConnectionProvider graphProvider = new NebulaGraphConnectionProvider(builder); - NebulaMetaConnectionProvider metaProvider = new NebulaMetaConnectionProvider(builder); + NebulaGraphConnectionProvider graphProvider = + new NebulaGraphConnectionProvider(nebulaClientOptions); + NebulaMetaConnectionProvider metaProvider = + new NebulaMetaConnectionProvider(nebulaClientOptions); DataStructureConverter converter = context.createDataStructureConverter(producedDataType); NebulaBatchOutputFormat outputFormat; diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/FailureHandlerEnum.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/FailureHandlerEnum.java new file mode 100644 index 0000000..9a5f98e --- /dev/null +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/FailureHandlerEnum.java @@ -0,0 +1,18 @@ +/* Copyright (c) 2023 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package org.apache.flink.connector.nebula.utils; + +public enum FailureHandlerEnum { + FAIL("FAIL"), + + IGNORE("IGNORE"); + + private final String handler; + + FailureHandlerEnum(String handler) { + this.handler = handler; + } +} diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java index e595a8d..53541c7 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java @@ -48,6 +48,7 @@ public class NebulaConstant { public static final int DEFAULT_CONNECT_TIMEOUT_MS = 3000; public static final int DEFAULT_CONNECT_RETRY = 3; public static final int DEFAULT_EXECUTION_RETRY = 3; + public static final int DEFAULT_RETRY_DELAY_MS = 1000; // params for create space public static final String CREATE_VID_TYPE = "vid_type"; diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/MockData.java b/connector/src/test/java/org/apache/flink/connector/nebula/MockData.java index 6334af0..640e184 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/MockData.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/MockData.java @@ -8,7 +8,8 @@ public class MockData { public static String createStringSpace() { - return "CREATE SPACE IF NOT EXISTS test_string(partition_num=10," + return "CLEAR SPACE IF EXISTS `test_string`;" + + "CREATE SPACE IF NOT EXISTS test_string(partition_num=10," + "vid_type=fixed_string(8));" + "USE test_string;" + "CREATE TAG IF NOT EXISTS person(col1 fixed_string(8), col2 string, col3 int32," @@ -18,7 +19,8 @@ public static String createStringSpace() { } public static String createIntSpace() { - return "CREATE SPACE IF NOT EXISTS test_int(partition_num=10,vid_type=int64);" + return "CLEAR SPACE IF EXISTS `test_int`;" + + "CREATE SPACE IF NOT EXISTS test_int(partition_num=10,vid_type=int64);" + "USE test_int;" + "CREATE TAG IF NOT EXISTS person(col1 fixed_string(8), col2 string, col3 int32," + " col4 double, col5 date, col6 datetime, col7 time, col8 timestamp);" @@ -27,10 +29,12 @@ public static String createIntSpace() { } public static String createFlinkSinkSpace() { - return "CREATE SPACE IF NOT EXISTS flink_sink(partition_num=10," + return "CLEAR SPACE IF EXISTS `flink_sink`;" + + "CREATE SPACE IF NOT EXISTS flink_sink(partition_num=10," + "vid_type=fixed_string(8));" + "USE flink_sink;" - + "CREATE TAG IF NOT EXISTS player(name string, age int);"; + + "CREATE TAG IF NOT EXISTS player(name string, age int);" + + "CREATE EDGE IF NOT EXISTS follow(degree int);"; } public static String createFlinkTestSpace() { diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaGraphConnectionProviderTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaGraphConnectionProviderTest.java index ad662aa..19e6293 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaGraphConnectionProviderTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaGraphConnectionProviderTest.java @@ -47,13 +47,13 @@ public void getNebulaPool() { NebulaPool nebulaPool = graphConnectionProvider.getNebulaPool(); nebulaPool.getSession("root", "nebula", true); } catch (Exception e) { - LOG.info("get session failed, ", e); + LOG.info("get session failed", e); assert (false); } } /** - * nebula server does not enable ssl, the connect cannot be established correctly. + * nebula server does not enable ssl, the connection cannot be established correctly. */ @Test(expected = NotValidConnectionException.class) public void getSessionWithSsl() throws NotValidConnectionException { @@ -82,7 +82,7 @@ public void getSessionWithSsl() throws NotValidConnectionException { pool.getSession("root", "nebula", true); } catch (UnknownHostException | IOErrorException | AuthFailedException | ClientServerIncompatibleException e) { - LOG.error("get session faied, ", e); + LOG.error("get session failed", e); assert (false); } } diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/sink/AbstractNebulaOutputFormatITTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/sink/AbstractNebulaOutputFormatITTest.java index 097a42f..6f064b5 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/sink/AbstractNebulaOutputFormatITTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/sink/AbstractNebulaOutputFormatITTest.java @@ -12,7 +12,6 @@ import static org.apache.flink.connector.nebula.NebulaValueUtils.timeOf; import static org.apache.flink.connector.nebula.NebulaValueUtils.valueOf; -import com.vesoft.nebula.client.graph.exception.IOErrorException; import java.util.Arrays; import java.util.List; import java.util.Objects; @@ -41,6 +40,12 @@ public class AbstractNebulaOutputFormatITTest extends NebulaITTestBase { private static TableEnvironment tableEnvironment; + static class FlinkJobException extends RuntimeException { + public FlinkJobException(Throwable cause) { + super(cause); + } + } + @BeforeClass public static void beforeAll() { initializeNebulaSession(); @@ -159,6 +164,68 @@ public void testSinkVertexData() throws ExecutionException, InterruptedException } + private void runSinkVertexInvalid(String failureHandler) + throws ExecutionException, InterruptedException { + Configuration configuration = tableEnvironment.getConfig().getConfiguration(); + configuration.setString("table.dml-sync", "true"); + + tableEnvironment.executeSql( + "CREATE TABLE person_fail (" + + "vid STRING," + + "col5 STRING" + + ")" + + "WITH (" + + "'connector'='nebula'," + + "'meta-address'='" + + META_ADDRESS + + "'," + + "'graph-address'='" + + GRAPH_ADDRESS + + "'," + + "'username'='" + + USERNAME + + "'," + + "'password'='" + + PASSWORD + + "'," + + "'graph-space'='flink_test'," + + "'label-name'='person'," + + "'data-type'='vertex'," + + "'failure-handler'='" + + failureHandler + + "'," + + "'max-retries'='0'," + + "'retry-delay-ms'='100'" + + ")" + ); + + try { + tableEnvironment.executeSql( + "INSERT INTO person_fail VALUES ('61', 'abc')" + ).await(); + } catch (Exception e) { + throw new FlinkJobException(e); + } + } + + /** + * sink Nebula Graph Vertex Data and fail with error + */ + @Test(expected = FlinkJobException.class) + public void testSinkVertexDataFailInvalid() + throws ExecutionException, InterruptedException { + runSinkVertexInvalid("fail"); + } + + /** + * sink Nebula Graph Vertex Data and ignore error + */ + @Test + public void testSinkVertexDataIgnoreInvalid() + throws ExecutionException, InterruptedException { + runSinkVertexInvalid("ignore"); + } + @Test public void testSinkVertexChangelog() throws ExecutionException, InterruptedException { int[] vids = new int[]{101, 102}; @@ -412,6 +479,71 @@ public void testSinkEdgeDataWithoutRank() throws ExecutionException, Interrupted ); } + private void runSinkEdgeInvalid(String failureHandler) + throws ExecutionException, InterruptedException { + Configuration configuration = tableEnvironment.getConfig().getConfiguration(); + configuration.setString("table.dml-sync", "true"); + + tableEnvironment.executeSql( + "CREATE TABLE friend_fail (" + + "src STRING," + + "dst STRING," + + "col5 STRING" + + ")" + + "WITH (" + + "'connector'='nebula'," + + "'meta-address'='" + + META_ADDRESS + + "'," + + "'graph-address'='" + + GRAPH_ADDRESS + + "'," + + "'username'='" + + USERNAME + + "'," + + "'password'='" + + PASSWORD + + "'," + + "'graph-space'='flink_test'," + + "'label-name'='friend'," + + "'src-id-index'='0'," + + "'dst-id-index'='1'," + + "'data-type'='edge'," + + "'failure-handler'='" + + failureHandler + + "'," + + "'max-retries'='0'," + + "'retry-delay-ms'='100'" + + ")" + ); + + try { + tableEnvironment.executeSql( + "INSERT INTO friend_fail VALUES ('61', '62', 'abc')" + ).await(); + } catch (Exception e) { + throw new FlinkJobException(e); + } + } + + /** + * sink Nebula Graph Edge Data and fail with error + */ + @Test(expected = FlinkJobException.class) + public void testSinkEdgeDataFailInvalid() + throws ExecutionException, InterruptedException { + runSinkEdgeInvalid("fail"); + } + + /** + * sink Nebula Graph Edge Data and ignore error + */ + @Test + public void testSinkEdgeDataIgnoreInvalid() + throws ExecutionException, InterruptedException { + runSinkEdgeInvalid("ignore"); + } + @Test public void testSinkEdgeChangelog() throws ExecutionException, InterruptedException { check(null, "INSERT vertex person() VALUES 101:()"); diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/sink/AbstractNebulaOutputFormatTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/sink/AbstractNebulaOutputFormatTest.java index 9a8c637..96e3bac 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/sink/AbstractNebulaOutputFormatTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/sink/AbstractNebulaOutputFormatTest.java @@ -5,17 +5,23 @@ package org.apache.flink.connector.nebula.sink; +import static org.apache.flink.connector.nebula.NebulaValueUtils.rowOf; +import static org.apache.flink.connector.nebula.NebulaValueUtils.valueOf; + import java.io.IOException; import java.util.Arrays; -import java.util.List; import org.apache.flink.connector.nebula.MockData; import org.apache.flink.connector.nebula.NebulaITTestBase; import org.apache.flink.connector.nebula.connection.NebulaClientOptions; import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider; import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider; +import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions; import org.apache.flink.connector.nebula.statement.VertexExecutionOptions; +import org.apache.flink.connector.nebula.utils.FailureHandlerEnum; +import org.apache.flink.connector.nebula.utils.WriteModeEnum; import org.apache.flink.types.Row; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; @@ -25,6 +31,11 @@ public class AbstractNebulaOutputFormatTest extends NebulaITTestBase { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNebulaOutputFormatTest.class); + private NebulaGraphConnectionProvider graphProvider; + private NebulaMetaConnectionProvider metaProvider; + private VertexExecutionOptions.ExecutionOptionBuilder vertexExecutionOptionBuilder = null; + private EdgeExecutionOptions.ExecutionOptionBuilder edgeExecutionOptionBuilder = null; + @BeforeClass public static void beforeAll() { initializeNebulaSession(); @@ -36,36 +47,176 @@ public static void afterAll() { closeNebulaSession(); } - @Test - public void testWrite() throws IOException { - List cols = Arrays.asList("name", "age"); - VertexExecutionOptions executionOptions = - new VertexExecutionOptions.ExecutionOptionBuilder() - .setGraphSpace("flink_sink") - .setTag("player") - .setFields(cols) - .setPositions(Arrays.asList(1, 2)) - .setIdIndex(0) - .setBatchSize(1) - .build(); - + @Before + public void before() { NebulaClientOptions clientOptions = new NebulaClientOptions .NebulaClientOptionsBuilder() .setMetaAddress(META_ADDRESS) .setGraphAddress(GRAPH_ADDRESS) .build(); - NebulaGraphConnectionProvider graphProvider = + graphProvider = new NebulaGraphConnectionProvider(clientOptions); - NebulaMetaConnectionProvider metaProvider = new NebulaMetaConnectionProvider(clientOptions); - Row row = new Row(3); - row.setField(0, 111); + metaProvider = new NebulaMetaConnectionProvider(clientOptions); + vertexExecutionOptionBuilder = new VertexExecutionOptions.ExecutionOptionBuilder() + .setGraphSpace("flink_sink") + .setTag("player") + .setFields(Arrays.asList("name", "age")) + .setPositions(Arrays.asList(1, 2)) + .setIdIndex(0) + .setBatchSize(1); + edgeExecutionOptionBuilder = new EdgeExecutionOptions.ExecutionOptionBuilder() + .setGraphSpace("flink_sink") + .setEdge("follow") + .setFields(Arrays.asList("degree")) + .setPositions(Arrays.asList(2)) + .setSrcIndex(0) + .setDstIndex(1) + .setBatchSize(1); + } + + @SafeVarargs + private static void writeRecords( + NebulaBatchOutputFormat outputFormat, T... row) throws IOException { + outputFormat.open(1, 2); + try { + for (T r : row) { + outputFormat.writeRecord(r); + } + } finally { + outputFormat.close(); + } + } + + @Test + public void testWriteVertex() throws IOException { + final VertexExecutionOptions options = vertexExecutionOptionBuilder.build(); + final NebulaVertexBatchOutputFormat outputFormat = + new NebulaVertexBatchOutputFormat(graphProvider, metaProvider, options); + + final Row row = new Row(3); + row.setField(0, "111"); row.setField(1, "jena"); row.setField(2, 12); - NebulaVertexBatchOutputFormat outputFormat = - new NebulaVertexBatchOutputFormat(graphProvider, metaProvider, executionOptions); + writeRecords(outputFormat, row); + executeNGql("USE flink_sink"); + check( + Arrays.asList(rowOf(valueOf(12))), + "MATCH (n) WHERE id(n) == \"111\" RETURN n.player.age" + ); + } - outputFormat.open(1, 2); - outputFormat.writeRecord(row); + @Test(expected = IOException.class) + public void testWriteVertexFailInvalid() throws IOException { + final VertexExecutionOptions options = vertexExecutionOptionBuilder + .setWriteMode(WriteModeEnum.INSERT) + .setFailureHandler(FailureHandlerEnum.FAIL) + .setMaxRetries(2) + .setRetryDelayMs(100) + .build(); + final NebulaVertexBatchOutputFormat outputFormat = + new NebulaVertexBatchOutputFormat(graphProvider, metaProvider, options); + + final Row row = new Row(3); + row.setField(0, "222"); + row.setField(1, "jena"); + row.setField(2, "abc"); + + writeRecords(outputFormat, row); + } + + @Test + public void testWriteVertexIgnoreInvalid() throws IOException { + final VertexExecutionOptions options = vertexExecutionOptionBuilder + .setWriteMode(WriteModeEnum.INSERT) + .setFailureHandler(FailureHandlerEnum.IGNORE) + .setMaxRetries(2) + .setRetryDelayMs(100) + .build(); + final NebulaVertexBatchOutputFormat outputFormat = + new NebulaVertexBatchOutputFormat(graphProvider, metaProvider, options); + + final Row row1 = new Row(3); + row1.setField(0, "222"); + row1.setField(1, "jena"); + row1.setField(2, "abc"); + final Row row2 = new Row(3); + row2.setField(0, "222"); + row2.setField(1, "jena"); + row2.setField(2, 15); + + writeRecords(outputFormat, row1, row2); + executeNGql("USE flink_sink"); + check( + Arrays.asList(rowOf(valueOf(15))), + "MATCH (n) WHERE id(n) == \"222\" RETURN n.player.age" + ); + } + + @Test + public void testWriteEdge() throws IOException { + final EdgeExecutionOptions options = edgeExecutionOptionBuilder.build(); + final NebulaEdgeBatchOutputFormat outputFormat = + new NebulaEdgeBatchOutputFormat(graphProvider, metaProvider, options); + + final Row row = new Row(3); + row.setField(0, "111"); + row.setField(1, "333"); + row.setField(2, 20); + + writeRecords(outputFormat, row); + executeNGql("USE flink_sink"); + check( + Arrays.asList(rowOf(valueOf(20))), + "GO FROM \"111\" OVER follow YIELD properties(edge).degree" + ); + } + + @Test(expected = IOException.class) + public void testWriteEdgeFailInvalid() throws IOException { + final EdgeExecutionOptions options = edgeExecutionOptionBuilder + .setWriteMode(WriteModeEnum.INSERT) + .setFailureHandler(FailureHandlerEnum.FAIL) + .setMaxRetries(2) + .setRetryDelayMs(100) + .build(); + final NebulaEdgeBatchOutputFormat outputFormat = + new NebulaEdgeBatchOutputFormat(graphProvider, metaProvider, options); + + final Row row = new Row(3); + row.setField(0, "222"); + row.setField(1, "333"); + row.setField(2, "abc"); + + writeRecords(outputFormat, row); + } + + @Test + public void testWriteEdgeIgnoreInvalid() throws IOException { + final EdgeExecutionOptions options = edgeExecutionOptionBuilder + .setWriteMode(WriteModeEnum.INSERT) + .setFailureHandler(FailureHandlerEnum.IGNORE) + .setMaxRetries(2) + .setRetryDelayMs(100) + .build(); + final NebulaEdgeBatchOutputFormat outputFormat = + new NebulaEdgeBatchOutputFormat(graphProvider, metaProvider, options); + + final Row row1 = new Row(3); + row1.setField(0, "222"); + row1.setField(1, "333"); + row1.setField(2, "abc"); + final Row row2 = new Row(3); + row2.setField(0, "222"); + row2.setField(1, "333"); + row2.setField(2, 25); + + writeRecords(outputFormat, row1, row2); + executeNGql("USE flink_sink"); + check( + Arrays.asList(rowOf(valueOf(25))), + "GO FROM \"222\" OVER follow YIELD properties(edge).degree" + ); } + } diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaEdgeBatchExecutorTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaEdgeBatchExecutorTest.java index 1039731..d1ec722 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaEdgeBatchExecutorTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaEdgeBatchExecutorTest.java @@ -6,6 +6,7 @@ package org.apache.flink.connector.nebula.sink; import com.vesoft.nebula.PropertyType; +import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -177,7 +178,7 @@ public void testAddToBatchWithDeletePolicy() { * test batch execute for int vid and insert mode */ @Test - public void testExecuteBatch() { + public void testExecuteBatch() throws IOException { EdgeExecutionOptions options = builder .setGraphSpace("test_int") .setPolicy("HASH") @@ -189,16 +190,14 @@ public void testExecuteBatch() { edgeBatchExecutor.addToBatch(row2); executeNGql("USE test_int"); - - String statement = edgeBatchExecutor.executeBatch(session); - assert (statement == null); + edgeBatchExecutor.executeBatch(session); } /** - * test batch exeucte for int vid and UPDATE mode + * test batch execute for int vid and UPDATE mode */ @Test - public void testExecuteBatchWithUpdate() { + public void testExecuteBatchWithUpdate() throws IOException { testExecuteBatch(); EdgeExecutionOptions options = builder .setGraphSpace("test_int") @@ -211,16 +210,14 @@ public void testExecuteBatchWithUpdate() { edgeBatchExecutor.addToBatch(row2); executeNGql("USE test_int"); - - String statement = edgeBatchExecutor.executeBatch(session); - assert (statement == null); + edgeBatchExecutor.executeBatch(session); } /** - * test batch exeucte for int vid and DELETE mode + * test batch execute for int vid and DELETE mode */ @Test - public void testExecuteBatchWithDelete() { + public void testExecuteBatchWithDelete() throws IOException { EdgeExecutionOptions options = builder.setGraphSpace("test_int") .setPolicy("HASH") .setWriteMode(WriteModeEnum.DELETE) @@ -231,16 +228,15 @@ public void testExecuteBatchWithDelete() { edgeBatchExecutor.addToBatch(row2); executeNGql("USE test_int"); - - String statement = edgeBatchExecutor.executeBatch(session); - assert (statement == null); + edgeBatchExecutor.executeBatch(session); } + /** - * test batch exeucte for string vid and insert mode + * test batch execute for string vid and insert mode */ @Test - public void testExecuteBatchWithStringVidAndInsert() { + public void testExecuteBatchWithStringVidAndInsert() throws IOException { EdgeExecutionOptions options = builder .setGraphSpace("test_string") .setWriteMode(WriteModeEnum.INSERT) @@ -251,16 +247,14 @@ public void testExecuteBatchWithStringVidAndInsert() { edgeBatchExecutor.addToBatch(row2); executeNGql("USE test_string"); - - String statement = edgeBatchExecutor.executeBatch(session); - assert (statement == null); + edgeBatchExecutor.executeBatch(session); } /** * test batch execute for string vid and update mode */ @Test - public void testExecuteBatchWithStringVidAndUpdate() { + public void testExecuteBatchWithStringVidAndUpdate() throws IOException { testExecuteBatchWithStringVidAndInsert(); EdgeExecutionOptions options = builder .setGraphSpace("test_string") @@ -272,16 +266,14 @@ public void testExecuteBatchWithStringVidAndUpdate() { edgeBatchExecutor.addToBatch(row2); executeNGql("USE test_string"); - - String statement = edgeBatchExecutor.executeBatch(session); - assert (statement == null); + edgeBatchExecutor.executeBatch(session); } /** * test batch execute for string vid and DELETE mode */ @Test - public void testExecuteBatchWithStringVidAndDelete() { + public void testExecuteBatchWithStringVidAndDelete() throws IOException { EdgeExecutionOptions options = builder .setGraphSpace("test_string") .setWriteMode(WriteModeEnum.DELETE) @@ -292,9 +284,7 @@ public void testExecuteBatchWithStringVidAndDelete() { edgeBatchExecutor.addToBatch(row2); executeNGql("USE test_string"); - - String statement = edgeBatchExecutor.executeBatch(session); - assert (statement == null); + edgeBatchExecutor.executeBatch(session); } } diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaOutputFormatConverterTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaOutputFormatConverterTest.java index ca24e2d..67e9a5c 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaOutputFormatConverterTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaOutputFormatConverterTest.java @@ -10,7 +10,6 @@ import java.util.HashMap; import java.util.Map; import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions; -import org.apache.flink.connector.nebula.statement.ExecutionOptions; import org.apache.flink.connector.nebula.statement.VertexExecutionOptions; import org.apache.flink.connector.nebula.utils.NebulaEdge; import org.apache.flink.connector.nebula.utils.NebulaVertex; diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaVertexBatchExecutorTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaVertexBatchExecutorTest.java index d720d3a..8f980a2 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaVertexBatchExecutorTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaVertexBatchExecutorTest.java @@ -6,6 +6,7 @@ package org.apache.flink.connector.nebula.sink; import com.vesoft.nebula.PropertyType; +import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -174,7 +175,7 @@ public void testAddToBatchWithDeletePolicy() { * test batch execute for int vid and insert mode */ @Test - public void testExecuteBatch() { + public void testExecuteBatch() throws IOException { VertexExecutionOptions options = builder .setGraphSpace("test_int") .setPolicy("HASH") @@ -186,16 +187,14 @@ public void testExecuteBatch() { vertexBatchExecutor.addToBatch(row2); executeNGql("USE test_int"); - - String statement = vertexBatchExecutor.executeBatch(session); - assert (statement == null); + vertexBatchExecutor.executeBatch(session); } /** - * test batch exeucte for int vid and UPDATE mode + * test batch execute for int vid and UPDATE mode */ @Test - public void testExecuteBatchWithUpdate() { + public void testExecuteBatchWithUpdate() throws IOException { testExecuteBatch(); VertexExecutionOptions options = builder .setGraphSpace("test_int") @@ -208,16 +207,14 @@ public void testExecuteBatchWithUpdate() { vertexBatchExecutor.addToBatch(row2); executeNGql("USE test_int"); - - String statement = vertexBatchExecutor.executeBatch(session); - assert (statement == null); + vertexBatchExecutor.executeBatch(session); } /** - * test batch exeucte for int vid and DELETE mode + * test batch execute for int vid and DELETE mode */ @Test - public void testExecuteBatchWithDelete() { + public void testExecuteBatchWithDelete() throws IOException { VertexExecutionOptions options = builder.setGraphSpace("test_int") .setPolicy("HASH") .setWriteMode(WriteModeEnum.DELETE) @@ -228,16 +225,14 @@ public void testExecuteBatchWithDelete() { vertexBatchExecutor.addToBatch(row2); executeNGql("USE test_int"); - - String statement = vertexBatchExecutor.executeBatch(session); - assert (statement == null); + vertexBatchExecutor.executeBatch(session); } /** - * test batch exeucte for string vid and insert mode + * test batch execute for string vid and insert mode */ @Test - public void testExecuteBatchWithStringVidAndInsert() { + public void testExecuteBatchWithStringVidAndInsert() throws IOException { VertexExecutionOptions options = builder .setGraphSpace("test_string") .setWriteMode(WriteModeEnum.INSERT) @@ -248,16 +243,14 @@ public void testExecuteBatchWithStringVidAndInsert() { vertexBatchExecutor.addToBatch(row2); executeNGql("USE test_string"); - - String statement = vertexBatchExecutor.executeBatch(session); - assert (statement == null); + vertexBatchExecutor.executeBatch(session); } /** * test batch execute for string vid and update mode */ @Test - public void testExecuteBatchWithStringVidAndUpdate() { + public void testExecuteBatchWithStringVidAndUpdate() throws IOException { testExecuteBatchWithStringVidAndInsert(); VertexExecutionOptions options = builder .setGraphSpace("test_string") @@ -269,16 +262,14 @@ public void testExecuteBatchWithStringVidAndUpdate() { vertexBatchExecutor.addToBatch(row2); executeNGql("USE test_string"); - - String statement = vertexBatchExecutor.executeBatch(session); - assert (statement == null); + vertexBatchExecutor.executeBatch(session); } /** * test batch execute for string vid and DELETE mode */ @Test - public void testExecuteBatchWithStringVidAndDelete() { + public void testExecuteBatchWithStringVidAndDelete() throws IOException { VertexExecutionOptions options = builder .setGraphSpace("test_string") .setWriteMode(WriteModeEnum.DELETE) @@ -289,9 +280,7 @@ public void testExecuteBatchWithStringVidAndDelete() { vertexBatchExecutor.addToBatch(row2); executeNGql("USE test_string"); - - String statement = vertexBatchExecutor.executeBatch(session); - assert (statement == null); + vertexBatchExecutor.executeBatch(session); } }