Skip to content

Latest commit

 

History

History
80 lines (63 loc) · 3.31 KB

README.md

File metadata and controls

80 lines (63 loc) · 3.31 KB

nebula-flink-connector

Flink Connector for Nebula Graph

GitHub stars GitHub fork

Nebula-Flink-Connector 2.0/3.0 is a connector that helps Flink users to easily access Nebula Graph 2.0/3.0. If you want to access Nebula Graph 1.x with Flink, please refer to Nebula-Flink-Connector 1.0.

Quick start

Prerequisites

To use Nebula Flink Connector, do a check of these:

Use in Maven

Add the dependency to your pom.xml.

<dependency>
    <groupId>com.vesoft</groupId>
    <artifactId>nebula-flink-connector</artifactId>
    <version>3.0-SNAPSHOT</version>
</dependency>

Example

To write data into Nebula Graph using Flink.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder()
                .setGraphAddress("127.0.0.1:9669")
                .setMetaAddress("127.0.0.1:9559")
                .build();
NebulaGraphConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions);
NebulaMetaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions);

VertexExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
                .setGraphSpace("flinkSink")
                .setTag("player")
                .setIdIndex(0)
                .setFields(Arrays.asList("name", "age"))
                .setPositions(Arrays.asList(1, 2))
                .setBatchSize(2)
                .build();

NebulaVertexBatchOutputFormat outputFormat = new NebulaVertexBatchOutputFormat(
                graphConnectionProvider, metaConnectionProvider, executionOptions);
NebulaSinkFunction<Row> nebulaSinkFunction = new NebulaSinkFunction<>(outputFormat);
DataStream<Row> dataStream = playerSource.map(row -> {
            Row record = new org.apache.flink.types.Row(row.size());
            for (int i = 0; i < row.size(); i++) {
                record.setField(i, row.get(i));
            }
            return record;
        });
dataStream.addSink(nebulaSinkFunction);
env.execute("write nebula")

Version match

There are the version correspondence between Nebula Flink Connector and Nebula:

Nebula Flink Connector Version Nebula Version
2.0.0 2.0.0, 2.0.1
2.5.0 2.5.0, 2.5.1
2.6.0 2.6.0, 2.6.1
2.6.1 2.6.0, 2.6.1
3.0.0 3.0.x, 3.1.x
3.0-SNAPSHOT nightly

Note

Flink version requirements: 1.11.x