Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Doc] Optimize cdcsource split table #3055

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions dinky-common/src/main/java/org/dinky/utils/SplitUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,22 @@
*/
@Slf4j
public class SplitUtil {
public static final String ENABLE = "enable";
public static final String MATCH_NUMBER_REGEX = "match_number_regex";
public static final String MAX_MATCH_VALUE = "match_number_regex";
public static final String MATCH_WAY = "match_way";

public static boolean contains(String regex, String sourceData) {
return Pattern.matches(regex, sourceData);
}

public static boolean isSplit(String value, Map<String, String> splitConfig) {
String matchNumberRegex = splitConfig.get("match_number_regex");
String matchNumberRegex = splitConfig.get(MATCH_NUMBER_REGEX);
Pattern pattern = Pattern.compile(matchNumberRegex);
Matcher matcher = pattern.matcher(value);
if (matcher.find()) {
long splitNum = Long.parseLong(matcher.group(0).replaceFirst("_", ""));
long maxMatchValue = Long.parseLong(splitConfig.get("max_match_value"));
long maxMatchValue = Long.parseLong(splitConfig.get(MAX_MATCH_VALUE));
return splitNum <= maxMatchValue;
}
return false;
Expand All @@ -53,16 +57,16 @@ public static boolean isSplit(String value, Map<String, String> splitConfig) {
public static String getReValue(String value, Map<String, String> splitConfig) {
if (isEnabled(splitConfig)) {
try {
String matchNumberRegex = splitConfig.get("match_number_regex");
String matchWay = splitConfig.get("match_way");
String matchNumberRegex = splitConfig.get(MATCH_NUMBER_REGEX);
String matchWay = splitConfig.get(MATCH_WAY);
Pattern pattern = Pattern.compile(matchNumberRegex);
Matcher matcher = pattern.matcher(value);
// Determine whether it is a prefix or a suffix
if ("prefix".equalsIgnoreCase(matchWay)) {
if (matcher.find()) {
String num = matcher.group(0);
long splitNum = Long.parseLong(num.replaceFirst("_", ""));
long maxMatchValue = Long.parseLong(splitConfig.get("max_match_value"));
long maxMatchValue = Long.parseLong(splitConfig.get(MAX_MATCH_VALUE));
if (splitNum <= maxMatchValue) {
return value.substring(0, value.lastIndexOf(num));
}
Expand All @@ -76,7 +80,7 @@ public static String getReValue(String value, Map<String, String> splitConfig) {
return value;
}
long splitNum = Long.parseLong(num.replaceFirst("_", ""));
long maxMatchValue = Long.parseLong(splitConfig.get("max_match_value"));
long maxMatchValue = Long.parseLong(splitConfig.get(MAX_MATCH_VALUE));
if (splitNum <= maxMatchValue) {
return value.substring(0, value.lastIndexOf(num));
}
Expand All @@ -90,6 +94,6 @@ public static String getReValue(String value, Map<String, String> splitConfig) {
}

public static boolean isEnabled(Map<String, String> split) {
return Boolean.parseBoolean(split.get("enable"));
return Boolean.parseBoolean(split.get(ENABLE));
}
}
2 changes: 1 addition & 1 deletion dinky-web/src/pages/Metrics/Server/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ const Server: React.FC<ServerProp> = (props) => {
useEffect(() => {
if (timeRange.isReal) {
return subscribeTopic([`${SSE_TOPIC.METRICS}/local`], (data: SseData) =>
setJvmData((prevState) => processData(prevState, [data.data]))
setJvmData((prevState) => [...processData(prevState, [data.data])])
);
}
}, [timeRange]);
Expand Down
59 changes: 32 additions & 27 deletions docs/docs/practical_guide/cdcsource_practice/cdcsource_overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ sink,也可以使用 FlinkSQL 无需修改代码直接扩展新的 sink。
![cdcsource_overview_sink_demo](http://pic.dinky.org.cn/dinky/docs/zh-CN/practical_guide/cdcsource_practice/cdcsource_overview/cdcsource_overview_sink_demo.png)

### 注意事项

一个 FlinkSQL 任务只能写一个 CDCSOURCE,CDCSOURCE 前可写 set、add customjar 和 ddl 语句。

配置项中的英文逗号前不能加空格,需要紧随右单引号。
Expand All @@ -51,7 +52,8 @@ sink,也可以使用 FlinkSQL 无需修改代码直接扩展新的 sink。

1. 禁用语句集
2. 禁用批模式
3. 自 Dinky v1.0.0 开始可以支持 全局变量, 区分整库同步内部变量 #{schemaName} 和 #{tableName} ,全局变量则使用 ${varName} , 请注意区分
3. 自 Dinky v1.0.0 开始可以支持 全局变量, 区分整库同步内部变量 #{schemaName} 和 #{tableName} ,全局变量则使用
${varName} , 请注意区分

:::

Expand Down Expand Up @@ -102,32 +104,35 @@ add customjar 'flink-sql-connector-mysql-cdc-2.3.0.jar'

## 配置参数

| 配置项 | 是否必须 | 默认值 | 说明 |
|--------------------------------|------|---------------|------------------------------------------------------------------------------------------------------------------------------|
| connector | 是 | 无 | 指定要使用的连接器 |
| hostname | 是 | 无 | 数据库服务器的 IP 地址或主机名 |
| port | 是 | 无 | 数据库服务器的端口号 |
| username | 是 | 无 | 连接到数据库服务器时要使用的数据库的用户名 |
| password | 是 | 无 | 连接到数据库服务器时要使用的数据库的密码 |
| scan.startup.mode | 否 | latest-offset | 消费者的可选启动模式,有效枚举为“initial”和“latest-offset” |
| database-name | 否 | 无 | 此参数非必填 |
| table-name | 是 | 无 | 只支持正则,示例:"test\\.student,test\\.score",所有表示例:"test\\..*" |
| source.* | 否 | 无 | 指定个性化的 CDC 配置,如 source.server-time-zone 即为 server-time-zone 配置参数。 |
| debezium.* | 否 | 无 | 支持debezium参数,示例:`'debezium.skipped.operations'='d'` 即过滤源数据库删除操作日志。 |
| jdbc.properties.* | 否 | 无 | 连接jdbc的url参数,示例:'jdbc.properties.useSSL' = 'false' 连接url效果: jdbc:mysql://ip:3306/db?useSSL=false 数据库连接参数 |
| checkpoint | 否 | 无 | 单位 ms |
| parallelism | 否 | 无 | 任务并行度 |
| sink.connector | 是 | 无 | 指定 sink 的类型,如 datastream-kafka、datastream-doris、datastream-hudi、kafka、doris、hudi、jdbc 等等,以 datastream- 开头的为 DataStream 的实现方式 |
| sink.sink.db | 否 | 无 | 目标数据源的库名,不指定时默认使用源数据源的库名 |
| sink.table.prefix | 否 | 无 | 目标表的表名前缀,如 ODS_ 即为所有的表名前拼接 ODS_ |
| sink.table.suffix | 否 | 无 | 目标表的表名后缀 |
| sink.table.upper | 否 | false | 目标表的表名全大写 |
| sink.table.lower | 否 | false | 目标表的表名全小写 |
| sink.auto.create | 否 | false | 目标数据源自动建表,目前只支持 Mysql,其他可自行扩展 |
| sink.timezone | 否 | UTC | 指定目标数据源的时区,在数据类型转换时自动生效 |
| sink.column.replace.line-break | 否 | false | 指定是否去除换行符,即在数据转换中进行 REGEXP_REPLACE(column, '\\n', '') |
| sink.* | 否 | 无 | 目标数据源的配置信息,同 FlinkSQL,使用 ${schemaName} 和 ${tableName} 可注入经过处理的源表名 |
| sink[N].* | 否 | 无 | N代表为多数据源写入, 默认从0开始到N, 其他配置参数信息参考sink.*的配置. |
| 配置项 | 是否必须 | 默认值 | 说明 |
|--------------------------------|------|---------------|------------------------------------------------------------------------------------------------------------------------------------|
| connector | 是 | 无 | 指定要使用的连接器 |
| hostname | 是 | 无 | 数据库服务器的 IP 地址或主机名 |
| port | 是 | 无 | 数据库服务器的端口号 |
| username | 是 | 无 | 连接到数据库服务器时要使用的数据库的用户名 |
| password | 是 | 无 | 连接到数据库服务器时要使用的数据库的密码 |
| scan.startup.mode | 否 | latest-offset | 消费者的可选启动模式,有效枚举为“initial”和“latest-offset” |
| database-name | 否 | 无 | 此参数非必填 |
| table-name | 是 | 无 | 只支持正则,示例:"test\\.student,test\\.score",所有表示例:"test\\..*" |
| split.enable | 否 | false | 是否开启分库分表模式同步 |
| split.match_number_regex | 否 | 无 | 分库分表匹配正则,如果你是table_1,table_2这种切分策略,可以使用 `_[0-9]+` 进行同步,目前也只支持这种策略,单库多表,多库多表,多表单库都可以支持,最终写入的表则是去掉_,如 source: db_1,tb_2, sink: db,tb |
| split.max_match_value | 否 | 无 | 分库分表的最大匹配的最大上限值,比如table_1 ...table_300,往后的table_301并不属于切分策略,则可设置为 `301` |
| source.* | 否 | 无 | 指定个性化的 CDC 配置,如 source.server-time-zone 即为 server-time-zone 配置参数。 |
| debezium.* | 否 | 无 | 支持debezium参数,示例:`'debezium.skipped.operations'='d'` 即过滤源数据库删除操作日志。 |
| jdbc.properties.* | 否 | 无 | 连接jdbc的url参数,示例:'jdbc.properties.useSSL' = 'false' 连接url效果: jdbc:mysql://ip:3306/db?useSSL=false 数据库连接参数 |
| checkpoint | 否 | 无 | 单位 ms |
| parallelism | 否 | 无 | 任务并行度 |
| sink.connector | 是 | 无 | 指定 sink 的类型,如 datastream-kafka、datastream-doris、datastream-hudi、kafka、doris、hudi、jdbc 等等,以 datastream- 开头的为 DataStream 的实现方式 |
| sink.sink.db | 否 | 无 | 目标数据源的库名,不指定时默认使用源数据源的库名 |
| sink.table.prefix | 否 | 无 | 目标表的表名前缀,如 ODS_ 即为所有的表名前拼接 ODS_ |
| sink.table.suffix | 否 | 无 | 目标表的表名后缀 |
| sink.table.upper | 否 | false | 目标表的表名全大写 |
| sink.table.lower | 否 | false | 目标表的表名全小写 |
| sink.auto.create | 否 | false | 目标数据源自动建表,目前只支持 Mysql,其他可自行扩展 |
| sink.timezone | 否 | UTC | 指定目标数据源的时区,在数据类型转换时自动生效 |
| sink.column.replace.line-break | 否 | false | 指定是否去除换行符,即在数据转换中进行 REGEXP_REPLACE(column, '\\n', '') |
| sink.* | 否 | 无 | 目标数据源的配置信息,同 FlinkSQL,使用 ${schemaName} 和 ${tableName} 可注入经过处理的源表名 |
| sink[N].* | 否 | 无 | N代表为多数据源写入, 默认从0开始到N, 其他配置参数信息参考sink.*的配置. |

### 支持debezium参数

Expand Down
Loading