diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/AbstractSqlSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/AbstractSqlSinkBuilder.java index 92cea935d5..d95d4917f8 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/AbstractSqlSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/AbstractSqlSinkBuilder.java @@ -279,5 +279,17 @@ public int partition(String key, int numPartitions) { } protected void executeCatalogStatement(CustomTableEnvironment customTableEnvironment) {} - ; + + /** + * replace view name middle to under line for flink use view name + * @param viewName view name + * @return view name + */ + public static String replaceViewNameMiddleLineToUnderLine(String viewName) { + if (!viewName.isEmpty() && viewName.contains("-")) { + logger.warn("the view name [{}] contains '-', replace '-' to '_' for flink use view name", viewName); + return viewName.replaceAll("-", "_"); + } + return viewName; + } } diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java index 642279650d..b01ff11d06 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java @@ -68,8 +68,9 @@ protected void initTypeConverterList() { private String addSourceTableView( CustomTableEnvironment customTableEnvironment, DataStream rowDataDataStream, Table table) { - // 上游表名称 - String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline(); + // Because the name of the view on Flink is not allowed to have -, it needs to be replaced with - here_ + String viewName = replaceViewNameMiddleLineToUnderLine("VIEW_" + table.getSchemaTableNameWithUnderline()); + customTableEnvironment.createTemporaryView( viewName, customTableEnvironment.fromChangelogStream(rowDataDataStream)); logger.info("Create {} temporaryView successful...", viewName); diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/catalog/SQLCatalogSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/catalog/SQLCatalogSinkBuilder.java index 2f6e8913d8..f2eefe7c72 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/catalog/SQLCatalogSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/catalog/SQLCatalogSinkBuilder.java @@ -67,7 +67,8 @@ public void addTableSink( String sinkSchemaName = getSinkSchemaName(table); String tableName = getSinkTableName(table); String sinkTableName = catalogName + "." + sinkSchemaName + "." + tableName; - String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline(); + // Because the name of the view on Flink is not allowed to have -, it needs to be replaced with - here_ + String viewName = replaceViewNameMiddleLineToUnderLine("VIEW_" + table.getSchemaTableNameWithUnderline()); customTableEnvironment.createTemporaryView( viewName, customTableEnvironment.fromChangelogStream(rowDataDataStream));