Skip to content

Commit

Permalink
Merge pull request alibaba#635 from heljoyLiu/pulgin-gdb-update-to-se…
Browse files Browse the repository at this point in the history
…t-property

gdbwrtier:  support set-property
  • Loading branch information
TrafalgarLuo authored Apr 23, 2020
2 parents 0631ad6 + e09ec84 commit 3b3fa87
Show file tree
Hide file tree
Showing 19 changed files with 1,344 additions and 1,002 deletions.
32 changes: 31 additions & 1 deletion gdbwriter/doc/gdbwriter.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ GDBWriter通过DataX框架获取Reader生成的协议数据,使用`g.addV/E(GD
{
"random": "60,64",
"type": "string"
},
{
"random": "100,1000",
"type": "long"
},
{
"random": "32,48",
"type": "string"
}
],
"sliceRecordCount": 1000
Expand Down Expand Up @@ -70,6 +78,18 @@ GDBWriter通过DataX框架获取Reader生成的协议数据,使用`g.addV/E(GD
"name": "vertex_propKey",
"value": "${2}",
"type": "string",
"columnType": "vertexSetProperty"
},
{
"name": "vertex_propKey",
"value": "${3}",
"type": "long",
"columnType": "vertexSetProperty"
},
{
"name": "vertex_propKey2",
"value": "${4}",
"type": "string",
"columnType": "vertexProperty"
}
]
Expand Down Expand Up @@ -290,6 +310,7 @@ GDBWriter通过DataX框架获取Reader生成的协议数据,使用`g.addV/E(GD
* primaryKey:表示该字段是主键id
* 点枚举值:
* vertexProperty:labelType为点时,表示该字段是点的普通属性
* vertexSetProperty:labelType为点时,表示该字段是点的SET属性,value是SET属性中的一个属性值
* vertexJsonProperty:labelType为点时,表示是点json属性,value结构请见备注**json properties示例**,点配置最多只允许出现一个json属性;
* 边枚举值:
* srcPrimaryKey:labelType为边时,表示该字段是起点主键id
Expand All @@ -305,6 +326,14 @@ GDBWriter通过DataX框架获取Reader生成的协议数据,使用`g.addV/E(GD
> {"k":"age","t":"int","v":"20"},
> {"k":"sex","t":"string","v":"male"}
> ]}
>
> # json格式同样支持给点添加SET属性,格式如下
> {"properties":[
> {"k":"name","t":"string","v":"tom","c":"set"},
> {"k":"name","t":"string","v":"jack","c":"set"},
> {"k":"age","t":"int","v":"20"},
> {"k":"sex","t":"string","v":"male"}
> ]}
> ```
## 4 性能报告
Expand Down Expand Up @@ -367,4 +396,5 @@ DataX压测机器
- GDBWriter插件与用户查询DSL使用相同的GDB实例端口,导入时可能会影响查询性能
## FAQ
1. 使用SET属性需要升级GDB实例到`1.0.20`版本及以上。
2. 边只支持普通单值属性,不能给边写SET属性数据。
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
package com.alibaba.datax.plugin.writer.gdbwriter;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Function;

import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
Expand All @@ -18,24 +13,33 @@
import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRuleFactory;
import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement;
import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbGraph;

import groovy.lang.Tuple2;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

public class GdbWriter extends Writer {
private static final Logger log = LoggerFactory.getLogger(GdbWriter.class);
private static final Logger log = LoggerFactory.getLogger(GdbWriter.class);

private static Function<Record, GdbElement> mapper = null;
private static GdbGraph globalGraph = null;
private static boolean session = false;
private static Function<Record, GdbElement> mapper = null;
private static GdbGraph globalGraph = null;
private static boolean session = false;

/**
* Job 中的方法仅执行一次,Task 中方法会由框架启动多个 Task 线程并行执行。
* <p/>
* 整个 Writer 执行流程是:
*
* <pre>
* Job类init-->prepare-->split
*
Expand All @@ -46,17 +50,16 @@ public class GdbWriter extends Writer {
* </pre>
*/
public static class Job extends Writer.Job {
private static final Logger LOG = LoggerFactory
.getLogger(Job.class);
private static final Logger LOG = LoggerFactory.getLogger(Job.class);

private Configuration jobConfig = null;

@Override
public void init() {
LOG.info("GDB datax plugin writer job init begin ...");
this.jobConfig = getPluginJobConf();
GdbWriterConfig.of(this.jobConfig);
LOG.info("GDB datax plugin writer job init end.");
LOG.info("GDB datax plugin writer job init begin ...");
this.jobConfig = getPluginJobConf();
GdbWriterConfig.of(this.jobConfig);
LOG.info("GDB datax plugin writer job init end.");

/**
* 注意:此方法仅执行一次。
Expand All @@ -71,37 +74,37 @@ public void prepare() {
* 注意:此方法仅执行一次。
* 最佳实践:如果 Job 中有需要进行数据同步之前的处理,可以在此处完成,如果没有必要则可以直接去掉。
*/
super.prepare();
super.prepare();

MappingRule rule = MappingRuleFactory.getInstance().createV2(jobConfig);
final MappingRule rule = MappingRuleFactory.getInstance().createV2(this.jobConfig);

mapper = new DefaultGdbMapper().getMapper(rule);
session = jobConfig.getBool(Key.SESSION_STATE, false);
mapper = new DefaultGdbMapper(this.jobConfig).getMapper(rule);
session = this.jobConfig.getBool(Key.SESSION_STATE, false);

/**
* client connect check before task
*/
try {
globalGraph = GdbGraphManager.instance().getGraph(jobConfig, false);
} catch (RuntimeException e) {
globalGraph = GdbGraphManager.instance().getGraph(this.jobConfig, false);
} catch (final RuntimeException e) {
throw DataXException.asDataXException(GdbWriterErrorCode.FAIL_CLIENT_CONNECT, e.getMessage());
}
}

@Override
public List<Configuration> split(int mandatoryNumber) {
public List<Configuration> split(final int mandatoryNumber) {
/**
* 注意:此方法仅执行一次。
* 最佳实践:通常采用工具静态类完成把 Job 配置切分成多个 Task 配置的工作。
* 这里的 mandatoryNumber 是强制必须切分的份数。
*/
LOG.info("split begin...");
List<Configuration> configurationList = new ArrayList<Configuration>();
for (int i = 0; i < mandatoryNumber; i++) {
configurationList.add(this.jobConfig.clone());
}
LOG.info("split end...");
return configurationList;
LOG.info("split begin...");
final List<Configuration> configurationList = new ArrayList<Configuration>();
for (int i = 0; i < mandatoryNumber; i++) {
configurationList.add(this.jobConfig.clone());
}
LOG.info("split end...");
return configurationList;
}

@Override
Expand All @@ -127,7 +130,7 @@ public void destroy() {
public static class Task extends Writer.Task {

private Configuration taskConfig;

private int failed = 0;
private int batchRecords;
private ExecutorService submitService = null;
Expand All @@ -139,24 +142,24 @@ public void init() {
* 注意:此方法每个 Task 都会执行一次。
* 最佳实践:此处通过对 taskConfig 配置的读取,进而初始化一些资源为 startWrite()做准备。
*/
this.taskConfig = super.getPluginJobConf();
batchRecords = taskConfig.getInt(Key.MAX_RECORDS_IN_BATCH, GdbWriterConfig.DEFAULT_RECORD_NUM_IN_BATCH);
submitService = new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(), new DefaultThreadFactory("submit-dsl"));

if (!session) {
graph = globalGraph;
} else {
/**
* 分批创建session client,由于服务端groovy编译性能的限制
*/
try {
Thread.sleep((getTaskId()/10)*10000);
} catch (Exception e) {
// ...
}
graph = GdbGraphManager.instance().getGraph(taskConfig, session);
}
this.taskConfig = super.getPluginJobConf();
this.batchRecords = this.taskConfig.getInt(Key.MAX_RECORDS_IN_BATCH, GdbWriterConfig.DEFAULT_RECORD_NUM_IN_BATCH);
this.submitService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(),
new DefaultThreadFactory("submit-dsl"));

if (!session) {
this.graph = globalGraph;
} else {
/**
* 分批创建session client,由于服务端groovy编译性能的限制
*/
try {
Thread.sleep((getTaskId() / 10) * 10000);
} catch (final Exception e) {
// ...
}
this.graph = GdbGraphManager.instance().getGraph(this.taskConfig, session);
}
}

@Override
Expand All @@ -165,64 +168,69 @@ public void prepare() {
* 注意:此方法每个 Task 都会执行一次。
* 最佳实践:如果 Task 中有需要进行数据同步之前的处理,可以在此处完成,如果没有必要则可以直接去掉。
*/
super.prepare();
super.prepare();
}

@Override
public void startWrite(RecordReceiver recordReceiver) {
public void startWrite(final RecordReceiver recordReceiver) {
/**
* 注意:此方法每个 Task 都会执行一次。
* 最佳实践:此处适当封装确保简洁清晰完成数据写入工作。
*/
Record r;
Future<Boolean> future = null;
List<Tuple2<Record, GdbElement>> records = new ArrayList<>(batchRecords);

while ((r = recordReceiver.getFromReader()) != null) {
records.add(new Tuple2<>(r, mapper.apply(r)));

if (records.size() >= batchRecords) {
wait4Submit(future);

final List<Tuple2<Record, GdbElement>> batch = records;
future = submitService.submit(() -> batchCommitRecords(batch));
records = new ArrayList<>(batchRecords);
}
}

wait4Submit(future);
if (!records.isEmpty()) {
final List<Tuple2<Record, GdbElement>> batch = records;
future = submitService.submit(() -> batchCommitRecords(batch));
wait4Submit(future);
}
Record r;
Future<Boolean> future = null;
List<Tuple2<Record, GdbElement>> records = new ArrayList<>(this.batchRecords);

while ((r = recordReceiver.getFromReader()) != null) {
try {
records.add(new Tuple2<>(r, mapper.apply(r)));
} catch (final Exception ex) {
getTaskPluginCollector().collectDirtyRecord(r, ex);
continue;
}

if (records.size() >= this.batchRecords) {
wait4Submit(future);

final List<Tuple2<Record, GdbElement>> batch = records;
future = this.submitService.submit(() -> batchCommitRecords(batch));
records = new ArrayList<>(this.batchRecords);
}
}

wait4Submit(future);
if (!records.isEmpty()) {
final List<Tuple2<Record, GdbElement>> batch = records;
future = this.submitService.submit(() -> batchCommitRecords(batch));
wait4Submit(future);
}
}

private void wait4Submit(Future<Boolean> future) {
if (future == null) {
return;
}
private void wait4Submit(final Future<Boolean> future) {
if (future == null) {
return;
}

try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
try {
future.get();
} catch (final Exception e) {
e.printStackTrace();
}
}

private boolean batchCommitRecords(final List<Tuple2<Record, GdbElement>> records) {
TaskPluginCollector collector = getTaskPluginCollector();
try {
List<Tuple2<Record, Exception>> errors = graph.add(records);
errors.forEach(t -> collector.collectDirtyRecord(t.getFirst(), t.getSecond()));
failed += errors.size();
} catch (Exception e) {
records.forEach(t -> collector.collectDirtyRecord(t.getFirst(), e));
failed += records.size();
}

records.clear();
return true;
final TaskPluginCollector collector = getTaskPluginCollector();
try {
final List<Tuple2<Record, Exception>> errors = this.graph.add(records);
errors.forEach(t -> collector.collectDirtyRecord(t.getFirst(), t.getSecond()));
this.failed += errors.size();
} catch (final Exception e) {
records.forEach(t -> collector.collectDirtyRecord(t.getFirst(), e));
this.failed += records.size();
}

records.clear();
return true;
}

@Override
Expand All @@ -231,7 +239,7 @@ public void post() {
* 注意:此方法每个 Task 都会执行一次。
* 最佳实践:如果 Task 中有需要进行数据同步之后的后续处理,可以在此处完成。
*/
log.info("Task done, dirty record count - {}", failed);
log.info("Task done, dirty record count - {}", this.failed);
}

@Override
Expand All @@ -241,9 +249,9 @@ public void destroy() {
* 最佳实践:通常配合Task 中的 post() 方法一起完成 Task 的资源释放。
*/
if (session) {
graph.close();
this.graph.close();
}
submitService.shutdown();
this.submitService.shutdown();
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public String getDescription() {

@Override
public String toString() {
return String.format("Code:[%s], Description:[%s]. ", this.code,
this.description);
return String.format("Code:[%s], Description:[%s]. ", this.code, this.description);
}
}
Loading

0 comments on commit 3b3fa87

Please sign in to comment.