diff --git a/dinky-common/src/main/java/org/dinky/utils/SplitUtil.java b/dinky-common/src/main/java/org/dinky/utils/SplitUtil.java index ece0949db1..c11b61ce8c 100644 --- a/dinky-common/src/main/java/org/dinky/utils/SplitUtil.java +++ b/dinky-common/src/main/java/org/dinky/utils/SplitUtil.java @@ -33,18 +33,22 @@ */ @Slf4j public class SplitUtil { + public static final String ENABLE = "enable"; + public static final String MATCH_NUMBER_REGEX = "match_number_regex"; + public static final String MAX_MATCH_VALUE = "match_number_regex"; + public static final String MATCH_WAY = "match_way"; public static boolean contains(String regex, String sourceData) { return Pattern.matches(regex, sourceData); } public static boolean isSplit(String value, Map splitConfig) { - String matchNumberRegex = splitConfig.get("match_number_regex"); + String matchNumberRegex = splitConfig.get(MATCH_NUMBER_REGEX); Pattern pattern = Pattern.compile(matchNumberRegex); Matcher matcher = pattern.matcher(value); if (matcher.find()) { long splitNum = Long.parseLong(matcher.group(0).replaceFirst("_", "")); - long maxMatchValue = Long.parseLong(splitConfig.get("max_match_value")); + long maxMatchValue = Long.parseLong(splitConfig.get(MAX_MATCH_VALUE)); return splitNum <= maxMatchValue; } return false; @@ -53,8 +57,8 @@ public static boolean isSplit(String value, Map splitConfig) { public static String getReValue(String value, Map splitConfig) { if (isEnabled(splitConfig)) { try { - String matchNumberRegex = splitConfig.get("match_number_regex"); - String matchWay = splitConfig.get("match_way"); + String matchNumberRegex = splitConfig.get(MATCH_NUMBER_REGEX); + String matchWay = splitConfig.get(MATCH_WAY); Pattern pattern = Pattern.compile(matchNumberRegex); Matcher matcher = pattern.matcher(value); // Determine whether it is a prefix or a suffix @@ -62,7 +66,7 @@ public static String getReValue(String value, Map splitConfig) { if (matcher.find()) { String num = matcher.group(0); long splitNum = Long.parseLong(num.replaceFirst("_", "")); - long maxMatchValue = Long.parseLong(splitConfig.get("max_match_value")); + long maxMatchValue = Long.parseLong(splitConfig.get(MAX_MATCH_VALUE)); if (splitNum <= maxMatchValue) { return value.substring(0, value.lastIndexOf(num)); } @@ -76,7 +80,7 @@ public static String getReValue(String value, Map splitConfig) { return value; } long splitNum = Long.parseLong(num.replaceFirst("_", "")); - long maxMatchValue = Long.parseLong(splitConfig.get("max_match_value")); + long maxMatchValue = Long.parseLong(splitConfig.get(MAX_MATCH_VALUE)); if (splitNum <= maxMatchValue) { return value.substring(0, value.lastIndexOf(num)); } @@ -90,6 +94,6 @@ public static String getReValue(String value, Map splitConfig) { } public static boolean isEnabled(Map split) { - return Boolean.parseBoolean(split.get("enable")); + return Boolean.parseBoolean(split.get(ENABLE)); } } diff --git a/dinky-web/src/pages/Metrics/Server/index.tsx b/dinky-web/src/pages/Metrics/Server/index.tsx index 87b265ded3..a16ae227df 100644 --- a/dinky-web/src/pages/Metrics/Server/index.tsx +++ b/dinky-web/src/pages/Metrics/Server/index.tsx @@ -72,7 +72,7 @@ const Server: React.FC = (props) => { useEffect(() => { if (timeRange.isReal) { return subscribeTopic([`${SSE_TOPIC.METRICS}/local`], (data: SseData) => - setJvmData((prevState) => processData(prevState, [data.data])) + setJvmData((prevState) => [...processData(prevState, [data.data])]) ); } }, [timeRange]); diff --git a/docs/docs/practical_guide/cdcsource_practice/cdcsource_overview.md b/docs/docs/practical_guide/cdcsource_practice/cdcsource_overview.md index 77d2e8888a..7045eb22f1 100644 --- a/docs/docs/practical_guide/cdcsource_practice/cdcsource_overview.md +++ b/docs/docs/practical_guide/cdcsource_practice/cdcsource_overview.md @@ -39,6 +39,7 @@ sink,也可以使用 FlinkSQL 无需修改代码直接扩展新的 sink。 ![cdcsource_overview_sink_demo](http://pic.dinky.org.cn/dinky/docs/zh-CN/practical_guide/cdcsource_practice/cdcsource_overview/cdcsource_overview_sink_demo.png) ### 注意事项 + 一个 FlinkSQL 任务只能写一个 CDCSOURCE,CDCSOURCE 前可写 set、add customjar 和 ddl 语句。 配置项中的英文逗号前不能加空格,需要紧随右单引号。 @@ -51,7 +52,8 @@ sink,也可以使用 FlinkSQL 无需修改代码直接扩展新的 sink。 1. 禁用语句集 2. 禁用批模式 -3. 自 Dinky v1.0.0 开始可以支持 全局变量, 区分整库同步内部变量 #{schemaName} 和 #{tableName} ,全局变量则使用 ${varName} , 请注意区分 +3. 自 Dinky v1.0.0 开始可以支持 全局变量, 区分整库同步内部变量 #{schemaName} 和 #{tableName} ,全局变量则使用 + ${varName} , 请注意区分 ::: @@ -102,32 +104,35 @@ add customjar 'flink-sql-connector-mysql-cdc-2.3.0.jar' ## 配置参数 -| 配置项 | 是否必须 | 默认值 | 说明 | -|--------------------------------|------|---------------|------------------------------------------------------------------------------------------------------------------------------| -| connector | 是 | 无 | 指定要使用的连接器 | -| hostname | 是 | 无 | 数据库服务器的 IP 地址或主机名 | -| port | 是 | 无 | 数据库服务器的端口号 | -| username | 是 | 无 | 连接到数据库服务器时要使用的数据库的用户名 | -| password | 是 | 无 | 连接到数据库服务器时要使用的数据库的密码 | -| scan.startup.mode | 否 | latest-offset | 消费者的可选启动模式,有效枚举为“initial”和“latest-offset” | -| database-name | 否 | 无 | 此参数非必填 | -| table-name | 是 | 无 | 只支持正则,示例:"test\\.student,test\\.score",所有表示例:"test\\..*" | -| source.* | 否 | 无 | 指定个性化的 CDC 配置,如 source.server-time-zone 即为 server-time-zone 配置参数。 | -| debezium.* | 否 | 无 | 支持debezium参数,示例:`'debezium.skipped.operations'='d'` 即过滤源数据库删除操作日志。 | -| jdbc.properties.* | 否 | 无 | 连接jdbc的url参数,示例:'jdbc.properties.useSSL' = 'false' 连接url效果: jdbc:mysql://ip:3306/db?useSSL=false 数据库连接参数 | -| checkpoint | 否 | 无 | 单位 ms | -| parallelism | 否 | 无 | 任务并行度 | -| sink.connector | 是 | 无 | 指定 sink 的类型,如 datastream-kafka、datastream-doris、datastream-hudi、kafka、doris、hudi、jdbc 等等,以 datastream- 开头的为 DataStream 的实现方式 | -| sink.sink.db | 否 | 无 | 目标数据源的库名,不指定时默认使用源数据源的库名 | -| sink.table.prefix | 否 | 无 | 目标表的表名前缀,如 ODS_ 即为所有的表名前拼接 ODS_ | -| sink.table.suffix | 否 | 无 | 目标表的表名后缀 | -| sink.table.upper | 否 | false | 目标表的表名全大写 | -| sink.table.lower | 否 | false | 目标表的表名全小写 | -| sink.auto.create | 否 | false | 目标数据源自动建表,目前只支持 Mysql,其他可自行扩展 | -| sink.timezone | 否 | UTC | 指定目标数据源的时区,在数据类型转换时自动生效 | -| sink.column.replace.line-break | 否 | false | 指定是否去除换行符,即在数据转换中进行 REGEXP_REPLACE(column, '\\n', '') | -| sink.* | 否 | 无 | 目标数据源的配置信息,同 FlinkSQL,使用 ${schemaName} 和 ${tableName} 可注入经过处理的源表名 | -| sink[N].* | 否 | 无 | N代表为多数据源写入, 默认从0开始到N, 其他配置参数信息参考sink.*的配置. | +| 配置项 | 是否必须 | 默认值 | 说明 | +|--------------------------------|------|---------------|------------------------------------------------------------------------------------------------------------------------------------| +| connector | 是 | 无 | 指定要使用的连接器 | +| hostname | 是 | 无 | 数据库服务器的 IP 地址或主机名 | +| port | 是 | 无 | 数据库服务器的端口号 | +| username | 是 | 无 | 连接到数据库服务器时要使用的数据库的用户名 | +| password | 是 | 无 | 连接到数据库服务器时要使用的数据库的密码 | +| scan.startup.mode | 否 | latest-offset | 消费者的可选启动模式,有效枚举为“initial”和“latest-offset” | +| database-name | 否 | 无 | 此参数非必填 | +| table-name | 是 | 无 | 只支持正则,示例:"test\\.student,test\\.score",所有表示例:"test\\..*" | +| split.enable | 否 | false | 是否开启分库分表模式同步 | +| split.match_number_regex | 否 | 无 | 分库分表匹配正则,如果你是table_1,table_2这种切分策略,可以使用 `_[0-9]+` 进行同步,目前也只支持这种策略,单库多表,多库多表,多表单库都可以支持,最终写入的表则是去掉_,如 source: db_1,tb_2, sink: db,tb | +| split.max_match_value | 否 | 无 | 分库分表的最大匹配的最大上限值,比如table_1 ...table_300,往后的table_301并不属于切分策略,则可设置为 `301` | +| source.* | 否 | 无 | 指定个性化的 CDC 配置,如 source.server-time-zone 即为 server-time-zone 配置参数。 | +| debezium.* | 否 | 无 | 支持debezium参数,示例:`'debezium.skipped.operations'='d'` 即过滤源数据库删除操作日志。 | +| jdbc.properties.* | 否 | 无 | 连接jdbc的url参数,示例:'jdbc.properties.useSSL' = 'false' 连接url效果: jdbc:mysql://ip:3306/db?useSSL=false 数据库连接参数 | +| checkpoint | 否 | 无 | 单位 ms | +| parallelism | 否 | 无 | 任务并行度 | +| sink.connector | 是 | 无 | 指定 sink 的类型,如 datastream-kafka、datastream-doris、datastream-hudi、kafka、doris、hudi、jdbc 等等,以 datastream- 开头的为 DataStream 的实现方式 | +| sink.sink.db | 否 | 无 | 目标数据源的库名,不指定时默认使用源数据源的库名 | +| sink.table.prefix | 否 | 无 | 目标表的表名前缀,如 ODS_ 即为所有的表名前拼接 ODS_ | +| sink.table.suffix | 否 | 无 | 目标表的表名后缀 | +| sink.table.upper | 否 | false | 目标表的表名全大写 | +| sink.table.lower | 否 | false | 目标表的表名全小写 | +| sink.auto.create | 否 | false | 目标数据源自动建表,目前只支持 Mysql,其他可自行扩展 | +| sink.timezone | 否 | UTC | 指定目标数据源的时区,在数据类型转换时自动生效 | +| sink.column.replace.line-break | 否 | false | 指定是否去除换行符,即在数据转换中进行 REGEXP_REPLACE(column, '\\n', '') | +| sink.* | 否 | 无 | 目标数据源的配置信息,同 FlinkSQL,使用 ${schemaName} 和 ${tableName} 可注入经过处理的源表名 | +| sink[N].* | 否 | 无 | N代表为多数据源写入, 默认从0开始到N, 其他配置参数信息参考sink.*的配置. | ### 支持debezium参数