Skip to content

Commit

Permalink
add desc for flink sql (#100)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 authored Apr 8, 2024
1 parent 4afb041 commit 94c76e2
Showing 1 changed file with 137 additions and 1 deletion.
138 changes: 137 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Add the dependency to your pom.xml.

## Example

To write data into Nebula Graph using Flink.
To write data into NebulaGraph using Flink.
```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder()
Expand Down Expand Up @@ -63,6 +63,142 @@ DataStream<Row> dataStream = playerSource.map(row -> {
dataStream.addSink(nebulaSinkFunction);
env.execute("write nebula")
```

To read data from NebulaGraph using Flink.
```
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder()
.setMetaAddress("127.0.0.1:9559")
.build();
storageConnectionProvider = new NebulaStorageConnectionProvider(nebulaClientOptions);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
NebulaSourceFunction sourceFunction = new NebulaSourceFunction(storageConnectionProvider)
.setExecutionOptions(vertexExecutionOptions);
DataStreamSource<BaseTableRow> dataStreamSource = env.addSource(sourceFunction);
dataStreamSource.map(row -> {
List<ValueWrapper> values = row.getValues();
Row record = new Row(15);
record.setField(0, values.get(0).asLong());
record.setField(1, values.get(1).asString());
record.setField(2, values.get(2).asString());
record.setField(3, values.get(3).asLong());
record.setField(4, values.get(4).asLong());
record.setField(5, values.get(5).asLong());
record.setField(6, values.get(6).asLong());
record.setField(7, values.get(7).asDate());
record.setField(8, values.get(8).asDateTime().getUTCDateTimeStr());
record.setField(9, values.get(9).asLong());
record.setField(10, values.get(10).asBoolean());
record.setField(11, values.get(11).asDouble());
record.setField(12, values.get(12).asDouble());
record.setField(13, values.get(13).asTime().getUTCTimeStr());
record.setField(14, values.get(14).asGeography());
return record;
}).print();
env.execute("NebulaStreamSource");
```

To operate Schema and data using Flink SQL.

1. create graph space
```
NebulaCatalog nebulaCatalog = NebulaCatalogUtils.createNebulaCatalog(
"NebulaCatalog",
"default",
"root",
"nebula",
"127.0.0.1:9559",
"127.0.0.1:9669");
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.registerCatalog(CATALOG_NAME, nebulaCatalog);
tableEnv.useCatalog(CATALOG_NAME);
String createDataBase = "CREATE DATABASE IF NOT EXISTS `db1`"
+ " COMMENT 'space 1'"
+ " WITH ("
+ " 'partition_num' = '100',"
+ " 'replica_factor' = '3',"
+ " 'vid_type' = 'FIXED_STRING(10)'"
+ ")";
tableEnv.executeSql(createDataBase);
```
2. create tag
```
tableEnvironment.executeSql("CREATE TABLE `person` ("
+ " vid BIGINT,"
+ " col1 STRING,"
+ " col2 STRING,"
+ " col3 BIGINT,"
+ " col4 BIGINT,"
+ " col5 BIGINT,"
+ " col6 BIGINT,"
+ " col7 DATE,"
+ " col8 TIMESTAMP,"
+ " col9 BIGINT,"
+ " col10 BOOLEAN,"
+ " col11 DOUBLE,"
+ " col12 DOUBLE,"
+ " col13 TIME,"
+ " col14 STRING"
+ ") WITH ("
+ " 'connector' = 'nebula',"
+ " 'meta-address' = '127.0.0.1:9559',"
+ " 'graph-address' = '127.0.0.1:9669',"
+ " 'username' = 'root',"
+ " 'password' = 'nebula',"
+ " 'data-type' = 'vertex',"
+ " 'graph-space' = 'flink_test',"
+ " 'label-name' = 'person'"
+ ")"
);
```
3. create edge
```
tableEnvironment.executeSql("CREATE TABLE `friend` ("
+ " sid BIGINT,"
+ " did BIGINT,"
+ " rid BIGINT,"
+ " col1 STRING,"
+ " col2 STRING,"
+ " col3 BIGINT,"
+ " col4 BIGINT,"
+ " col5 BIGINT,"
+ " col6 BIGINT,"
+ " col7 DATE,"
+ " col8 TIMESTAMP,"
+ " col9 BIGINT,"
+ " col10 BOOLEAN,"
+ " col11 DOUBLE,"
+ " col12 DOUBLE,"
+ " col13 TIME,"
+ " col14 STRING"
+ ") WITH ("
+ " 'connector' = 'nebula',"
+ " 'meta-address' = '127.0.0.1:9559',"
+ " 'graph-address' = '127.0.0.1:9669',"
+ " 'username' = 'root',"
+ " 'password' = 'nebula',"
+ " 'graph-space' = 'flink_test',"
+ " 'label-name' = 'friend',"
+ " 'data-type'='edge',"
+ " 'src-id-index'='0',"
+ " 'dst-id-index'='1',"
+ " 'rank-id-index'='2'"
+ ")"
);
```
4. query edge data and insert into another edge type
```
Table table = tableEnvironment.sqlQuery("SELECT * FROM `friend`");
table.executeInsert("`friend_sink`").await();
```

## Version match

There are the version correspondence between Nebula Flink Connector and Nebula:
Expand Down

0 comments on commit 94c76e2

Please sign in to comment.