Skip to content

Commit

Permalink
Optimize_cdcsource_split_table_doc
Browse files Browse the repository at this point in the history
  • Loading branch information
zackyoungh committed Jan 24, 2024
1 parent 78a179b commit ea3ad81
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 35 deletions.
18 changes: 11 additions & 7 deletions dinky-common/src/main/java/org/dinky/utils/SplitUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,22 @@
*/
@Slf4j
public class SplitUtil {
public static final String ENABLE = "enable";
public static final String MATCH_NUMBER_REGEX = "match_number_regex";
public static final String MAX_MATCH_VALUE = "match_number_regex";
public static final String MATCH_WAY = "match_way";

public static boolean contains(String regex, String sourceData) {
return Pattern.matches(regex, sourceData);
}

public static boolean isSplit(String value, Map<String, String> splitConfig) {
String matchNumberRegex = splitConfig.get("match_number_regex");
String matchNumberRegex = splitConfig.get(MATCH_NUMBER_REGEX);
Pattern pattern = Pattern.compile(matchNumberRegex);
Matcher matcher = pattern.matcher(value);
if (matcher.find()) {
long splitNum = Long.parseLong(matcher.group(0).replaceFirst("_", ""));
long maxMatchValue = Long.parseLong(splitConfig.get("max_match_value"));
long maxMatchValue = Long.parseLong(splitConfig.get(MAX_MATCH_VALUE));
return splitNum <= maxMatchValue;
}
return false;
Expand All @@ -53,16 +57,16 @@ public static boolean isSplit(String value, Map<String, String> splitConfig) {
public static String getReValue(String value, Map<String, String> splitConfig) {
if (isEnabled(splitConfig)) {
try {
String matchNumberRegex = splitConfig.get("match_number_regex");
String matchWay = splitConfig.get("match_way");
String matchNumberRegex = splitConfig.get(MATCH_NUMBER_REGEX);
String matchWay = splitConfig.get(MATCH_WAY);
Pattern pattern = Pattern.compile(matchNumberRegex);
Matcher matcher = pattern.matcher(value);
// Determine whether it is a prefix or a suffix
if ("prefix".equalsIgnoreCase(matchWay)) {
if (matcher.find()) {
String num = matcher.group(0);
long splitNum = Long.parseLong(num.replaceFirst("_", ""));
long maxMatchValue = Long.parseLong(splitConfig.get("max_match_value"));
long maxMatchValue = Long.parseLong(splitConfig.get(MAX_MATCH_VALUE));
if (splitNum <= maxMatchValue) {
return value.substring(0, value.lastIndexOf(num));
}
Expand All @@ -76,7 +80,7 @@ public static String getReValue(String value, Map<String, String> splitConfig) {
return value;
}
long splitNum = Long.parseLong(num.replaceFirst("_", ""));
long maxMatchValue = Long.parseLong(splitConfig.get("max_match_value"));
long maxMatchValue = Long.parseLong(splitConfig.get(MAX_MATCH_VALUE));
if (splitNum <= maxMatchValue) {
return value.substring(0, value.lastIndexOf(num));
}
Expand All @@ -90,6 +94,6 @@ public static String getReValue(String value, Map<String, String> splitConfig) {
}

public static boolean isEnabled(Map<String, String> split) {
return Boolean.parseBoolean(split.get("enable"));
return Boolean.parseBoolean(split.get(ENABLE));
}
}
2 changes: 1 addition & 1 deletion dinky-web/src/pages/Metrics/Server/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ const Server: React.FC<ServerProp> = (props) => {
useEffect(() => {
if (timeRange.isReal) {
return subscribeTopic([`${SSE_TOPIC.METRICS}/local`], (data: SseData) =>
setJvmData((prevState) => processData(prevState, [data.data]))
setJvmData((prevState) => [...processData(prevState, [data.data])])
);
}
}, [timeRange]);
Expand Down
59 changes: 32 additions & 27 deletions docs/docs/practical_guide/cdcsource_practice/cdcsource_overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ sink,也可以使用 FlinkSQL 无需修改代码直接扩展新的 sink。
![cdcsource_overview_sink_demo](http://pic.dinky.org.cn/dinky/docs/zh-CN/practical_guide/cdcsource_practice/cdcsource_overview/cdcsource_overview_sink_demo.png)

### 注意事项

一个 FlinkSQL 任务只能写一个 CDCSOURCE,CDCSOURCE 前可写 set、add customjar 和 ddl 语句。

配置项中的英文逗号前不能加空格,需要紧随右单引号。
Expand All @@ -51,7 +52,8 @@ sink,也可以使用 FlinkSQL 无需修改代码直接扩展新的 sink。

1. 禁用语句集
2. 禁用批模式
3. 自 Dinky v1.0.0 开始可以支持 全局变量, 区分整库同步内部变量 #{schemaName} 和 #{tableName} ,全局变量则使用 ${varName} , 请注意区分
3. 自 Dinky v1.0.0 开始可以支持 全局变量, 区分整库同步内部变量 #{schemaName} 和 #{tableName} ,全局变量则使用
${varName} , 请注意区分

:::

Expand Down Expand Up @@ -102,32 +104,35 @@ add customjar 'flink-sql-connector-mysql-cdc-2.3.0.jar'

## 配置参数

| 配置项 | 是否必须 | 默认值 | 说明 |
|--------------------------------|------|---------------|------------------------------------------------------------------------------------------------------------------------------|
| connector ||| 指定要使用的连接器 |
| hostname ||| 数据库服务器的 IP 地址或主机名 |
| port ||| 数据库服务器的端口号 |
| username ||| 连接到数据库服务器时要使用的数据库的用户名 |
| password ||| 连接到数据库服务器时要使用的数据库的密码 |
| scan.startup.mode || latest-offset | 消费者的可选启动模式,有效枚举为“initial”和“latest-offset” |
| database-name ||| 此参数非必填 |
| table-name ||| 只支持正则,示例:"test\\.student,test\\.score",所有表示例:"test\\..*" |
| source.* ||| 指定个性化的 CDC 配置,如 source.server-time-zone 即为 server-time-zone 配置参数。 |
| debezium.* ||| 支持debezium参数,示例:`'debezium.skipped.operations'='d'` 即过滤源数据库删除操作日志。 |
| jdbc.properties.* ||| 连接jdbc的url参数,示例:'jdbc.properties.useSSL' = 'false' 连接url效果: jdbc:mysql://ip:3306/db?useSSL=false 数据库连接参数 |
| checkpoint ||| 单位 ms |
| parallelism ||| 任务并行度 |
| sink.connector ||| 指定 sink 的类型,如 datastream-kafka、datastream-doris、datastream-hudi、kafka、doris、hudi、jdbc 等等,以 datastream- 开头的为 DataStream 的实现方式 |
| sink.sink.db ||| 目标数据源的库名,不指定时默认使用源数据源的库名 |
| sink.table.prefix ||| 目标表的表名前缀,如 ODS_ 即为所有的表名前拼接 ODS_ |
| sink.table.suffix ||| 目标表的表名后缀 |
| sink.table.upper || false | 目标表的表名全大写 |
| sink.table.lower || false | 目标表的表名全小写 |
| sink.auto.create || false | 目标数据源自动建表,目前只支持 Mysql,其他可自行扩展 |
| sink.timezone || UTC | 指定目标数据源的时区,在数据类型转换时自动生效 |
| sink.column.replace.line-break || false | 指定是否去除换行符,即在数据转换中进行 REGEXP_REPLACE(column, '\\n', '') |
| sink.* ||| 目标数据源的配置信息,同 FlinkSQL,使用 ${schemaName} 和 ${tableName} 可注入经过处理的源表名 |
| sink[N].* ||| N代表为多数据源写入, 默认从0开始到N, 其他配置参数信息参考sink.*的配置. |
| 配置项 | 是否必须 | 默认值 | 说明 |
|--------------------------------|------|---------------|------------------------------------------------------------------------------------------------------------------------------------|
| connector ||| 指定要使用的连接器 |
| hostname ||| 数据库服务器的 IP 地址或主机名 |
| port ||| 数据库服务器的端口号 |
| username ||| 连接到数据库服务器时要使用的数据库的用户名 |
| password ||| 连接到数据库服务器时要使用的数据库的密码 |
| scan.startup.mode || latest-offset | 消费者的可选启动模式,有效枚举为“initial”和“latest-offset” |
| database-name ||| 此参数非必填 |
| table-name ||| 只支持正则,示例:"test\\.student,test\\.score",所有表示例:"test\\..*" |
| split.enable || false | 是否开启分库分表模式同步 |
| split.match_number_regex ||| 分库分表匹配正则,如果你是table_1,table_2这种切分策略,可以使用 `_[0-9]+` 进行同步,目前也只支持这种策略,单库多表,多库多表,多表单库都可以支持,最终写入的表则是去掉_,如 source: db_1,tb_2, sink: db,tb |
| split.max_match_value ||| 分库分表的最大匹配的最大上限值,比如table_1 ...table_300,往后的table_301并不属于切分策略,则可设置为 `301` |
| source.* ||| 指定个性化的 CDC 配置,如 source.server-time-zone 即为 server-time-zone 配置参数。 |
| debezium.* ||| 支持debezium参数,示例:`'debezium.skipped.operations'='d'` 即过滤源数据库删除操作日志。 |
| jdbc.properties.* ||| 连接jdbc的url参数,示例:'jdbc.properties.useSSL' = 'false' 连接url效果: jdbc:mysql://ip:3306/db?useSSL=false 数据库连接参数 |
| checkpoint ||| 单位 ms |
| parallelism ||| 任务并行度 |
| sink.connector ||| 指定 sink 的类型,如 datastream-kafka、datastream-doris、datastream-hudi、kafka、doris、hudi、jdbc 等等,以 datastream- 开头的为 DataStream 的实现方式 |
| sink.sink.db ||| 目标数据源的库名,不指定时默认使用源数据源的库名 |
| sink.table.prefix ||| 目标表的表名前缀,如 ODS_ 即为所有的表名前拼接 ODS_ |
| sink.table.suffix ||| 目标表的表名后缀 |
| sink.table.upper || false | 目标表的表名全大写 |
| sink.table.lower || false | 目标表的表名全小写 |
| sink.auto.create || false | 目标数据源自动建表,目前只支持 Mysql,其他可自行扩展 |
| sink.timezone || UTC | 指定目标数据源的时区,在数据类型转换时自动生效 |
| sink.column.replace.line-break || false | 指定是否去除换行符,即在数据转换中进行 REGEXP_REPLACE(column, '\\n', '') |
| sink.* ||| 目标数据源的配置信息,同 FlinkSQL,使用 ${schemaName} 和 ${tableName} 可注入经过处理的源表名 |
| sink[N].* ||| N代表为多数据源写入, 默认从0开始到N, 其他配置参数信息参考sink.*的配置. |

### 支持debezium参数

Expand Down

0 comments on commit ea3ad81

Please sign in to comment.