diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java index 3f54db5415e7..b87a92806262 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java @@ -38,8 +38,10 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions; +import com.ververica.cdc.debezium.table.DebeziumOptions; import io.debezium.connector.AbstractSourceInfo; import io.debezium.relational.Column; +import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.Table; import io.debezium.relational.history.TableChanges; import org.apache.flink.api.common.functions.FlatMapFunction; @@ -74,7 +76,7 @@ public class MySqlRecordParser implements FlatMapFunction computedColumns; private final TypeMapping typeMapping; - + private final boolean isDebeziumSchemaCommentsEnabled; private DebeziumEvent root; // NOTE: current table name is not converted by tableNameConverter @@ -96,6 +98,12 @@ public MySqlRecordParser( .configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true) .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); String stringifyServerTimeZone = mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE); + + this.isDebeziumSchemaCommentsEnabled = + mySqlConfig.getBoolean( + DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX + + RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(), + false); this.serverTimeZone = stringifyServerTimeZone == null ? ZoneId.systemDefault() @@ -174,7 +182,12 @@ private List extractFields(Table table) { typeMapping); dataType = dataType.copy(typeMapping.containsMode(TO_NULLABLE) || column.isOptional()); - rowType.field(column.name(), dataType); + // add column comment when we upgrade flink cdc to 2.4 + if (isDebeziumSchemaCommentsEnabled) { + rowType.field(column.name(), dataType, column.comment()); + } else { + rowType.field(column.name(), dataType); + } } return rowType.build().getFields(); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index 9568b1c3b7fa..7e11b541de6a 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -325,6 +325,65 @@ private void testSchemaEvolutionMultipleImpl(Statement statement) throws Excepti waitForResult(expected, table, rowType, primaryKeys); } + @Test + @Timeout(60) + public void testSchemaEvolutionWithComment() throws Exception { + Map mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put("database-name", DATABASE_NAME); + mySqlConfig.put("table-name", "schema_evolution_comment"); + mySqlConfig.put("debezium.include.schema.comments", "true"); + + MySqlSyncTableAction action = + syncTableActionBuilder(mySqlConfig) + .withCatalogConfig( + Collections.singletonMap( + CatalogOptions.METASTORE.key(), "test-alter-table")) + .withTableConfig(getBasicTableConfig()) + .withPrimaryKeys("_id") + .build(); + runActionWithDefaultEnv(action); + + try (Statement statement = getStatement()) { + testSchemaEvolutionWithCommentImpl(statement); + } + } + + private void testSchemaEvolutionWithCommentImpl(Statement statement) throws Exception { + FileStoreTable table = getFileStoreTable(); + statement.executeUpdate("USE " + DATABASE_NAME); + statement.executeUpdate("INSERT INTO schema_evolution_comment VALUES (1, 'one')"); + + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"_id", "v1"}); + List primaryKeys = Collections.singletonList("_id"); + List expected = Collections.singletonList("+I[1, one]"); + waitForResult(expected, table, rowType, primaryKeys); + + statement.executeUpdate( + "ALTER TABLE schema_evolution_comment MODIFY COLUMN v1 VARCHAR(20) COMMENT 'v1-new'"); + statement.executeUpdate("INSERT INTO schema_evolution_comment VALUES (2, 'two')"); + + statement.executeUpdate( + "ALTER TABLE schema_evolution_comment ADD COLUMN v2 INT COMMENT 'v2'"); + + statement.executeUpdate("INSERT INTO schema_evolution_comment VALUES (3, 'three', 30)"); + rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), DataTypes.VARCHAR(20), DataTypes.INT() + }, + new String[] {"_id", "v1", "v2"}); + expected = Arrays.asList("+I[1, one, NULL]", "+I[2, two, NULL]", "+I[3, three, 30]"); + waitForResult(expected, table, rowType, primaryKeys); + + checkTableSchema( + "[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT NULL\",\"description\":\"primary\"}," + + "{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(20)\",\"description\":\"v1-new\"}," + + "{\"id\":2,\"name\":\"v2\",\"type\":\"INT\",\"description\":\"v2\"}]"); + } + @Test @Timeout(90) public void testAllTypes() throws Exception { diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql index 676185fb9291..965f884ec680 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql @@ -49,6 +49,20 @@ CREATE TABLE schema_evolution_multiple ( PRIMARY KEY (_id) ); +-- ################################################################################ +-- MySqlSyncTableActionITCase +-- ################################################################################ + +CREATE TABLE schema_evolution_comment ( + _id INT comment 'primary', + v1 VARCHAR(10) comment 'v1', + PRIMARY KEY (_id) +); + +-- ################################################################################ +-- testAllTypes +-- ################################################################################ + CREATE TABLE all_types_table ( _id INT, pt DECIMAL(2, 1),