Skip to content

Commit

Permalink
[Fix] Fix the issue where the table name has a middle line that preve…
Browse files Browse the repository at this point in the history
…nts task execution (#3833)

Signed-off-by: Zzm0809 <[email protected]>
Co-authored-by: Zzm0809 <[email protected]>
  • Loading branch information
Zzm0809 and Zzm0809 authored Sep 26, 2024
1 parent 1d7d2c2 commit cf7f9b2
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,5 +279,17 @@ public int partition(String key, int numPartitions) {
}

protected void executeCatalogStatement(CustomTableEnvironment customTableEnvironment) {}
;

/**
* replace view name middle to under line for flink use view name
* @param viewName view name
* @return view name
*/
public static String replaceViewNameMiddleLineToUnderLine(String viewName) {
if (!viewName.isEmpty() && viewName.contains("-")) {
logger.warn("the view name [{}] contains '-', replace '-' to '_' for flink use view name", viewName);
return viewName.replaceAll("-", "_");
}
return viewName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ protected void initTypeConverterList() {

private String addSourceTableView(
CustomTableEnvironment customTableEnvironment, DataStream<Row> rowDataDataStream, Table table) {
// 上游表名称
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
// Because the name of the view on Flink is not allowed to have -, it needs to be replaced with - here_
String viewName = replaceViewNameMiddleLineToUnderLine("VIEW_" + table.getSchemaTableNameWithUnderline());

customTableEnvironment.createTemporaryView(
viewName, customTableEnvironment.fromChangelogStream(rowDataDataStream));
logger.info("Create {} temporaryView successful...", viewName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public void addTableSink(
String sinkSchemaName = getSinkSchemaName(table);
String tableName = getSinkTableName(table);
String sinkTableName = catalogName + "." + sinkSchemaName + "." + tableName;
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
// Because the name of the view on Flink is not allowed to have -, it needs to be replaced with - here_
String viewName = replaceViewNameMiddleLineToUnderLine("VIEW_" + table.getSchemaTableNameWithUnderline());

customTableEnvironment.createTemporaryView(
viewName, customTableEnvironment.fromChangelogStream(rowDataDataStream));
Expand Down

0 comments on commit cf7f9b2

Please sign in to comment.