Skip to content

Commit

Permalink
[cdc] cdc schema change should add column comment (apache#3667)
Browse files Browse the repository at this point in the history
  • Loading branch information
hadoopkandy authored Aug 12, 2024
1 parent 4a24868 commit 32ff204
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.relational.Column;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.history.TableChanges;
import org.apache.flink.api.common.functions.FlatMapFunction;
Expand Down Expand Up @@ -74,7 +76,7 @@ public class MySqlRecordParser implements FlatMapFunction<CdcSourceRecord, RichC
private final ZoneId serverTimeZone;
private final List<ComputedColumn> computedColumns;
private final TypeMapping typeMapping;

private final boolean isDebeziumSchemaCommentsEnabled;
private DebeziumEvent root;

// NOTE: current table name is not converted by tableNameConverter
Expand All @@ -96,6 +98,12 @@ public MySqlRecordParser(
.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
String stringifyServerTimeZone = mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);

this.isDebeziumSchemaCommentsEnabled =
mySqlConfig.getBoolean(
DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX
+ RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
false);
this.serverTimeZone =
stringifyServerTimeZone == null
? ZoneId.systemDefault()
Expand Down Expand Up @@ -174,7 +182,12 @@ private List<DataField> extractFields(Table table) {
typeMapping);
dataType = dataType.copy(typeMapping.containsMode(TO_NULLABLE) || column.isOptional());

rowType.field(column.name(), dataType);
// add column comment when we upgrade flink cdc to 2.4
if (isDebeziumSchemaCommentsEnabled) {
rowType.field(column.name(), dataType, column.comment());
} else {
rowType.field(column.name(), dataType);
}
}
return rowType.build().getFields();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,65 @@ private void testSchemaEvolutionMultipleImpl(Statement statement) throws Excepti
waitForResult(expected, table, rowType, primaryKeys);
}

@Test
@Timeout(60)
public void testSchemaEvolutionWithComment() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", DATABASE_NAME);
mySqlConfig.put("table-name", "schema_evolution_comment");
mySqlConfig.put("debezium.include.schema.comments", "true");

MySqlSyncTableAction action =
syncTableActionBuilder(mySqlConfig)
.withCatalogConfig(
Collections.singletonMap(
CatalogOptions.METASTORE.key(), "test-alter-table"))
.withTableConfig(getBasicTableConfig())
.withPrimaryKeys("_id")
.build();
runActionWithDefaultEnv(action);

try (Statement statement = getStatement()) {
testSchemaEvolutionWithCommentImpl(statement);
}
}

private void testSchemaEvolutionWithCommentImpl(Statement statement) throws Exception {
FileStoreTable table = getFileStoreTable();
statement.executeUpdate("USE " + DATABASE_NAME);
statement.executeUpdate("INSERT INTO schema_evolution_comment VALUES (1, 'one')");

RowType rowType =
RowType.of(
new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)},
new String[] {"_id", "v1"});
List<String> primaryKeys = Collections.singletonList("_id");
List<String> expected = Collections.singletonList("+I[1, one]");
waitForResult(expected, table, rowType, primaryKeys);

statement.executeUpdate(
"ALTER TABLE schema_evolution_comment MODIFY COLUMN v1 VARCHAR(20) COMMENT 'v1-new'");
statement.executeUpdate("INSERT INTO schema_evolution_comment VALUES (2, 'two')");

statement.executeUpdate(
"ALTER TABLE schema_evolution_comment ADD COLUMN v2 INT COMMENT 'v2'");

statement.executeUpdate("INSERT INTO schema_evolution_comment VALUES (3, 'three', 30)");
rowType =
RowType.of(
new DataType[] {
DataTypes.INT().notNull(), DataTypes.VARCHAR(20), DataTypes.INT()
},
new String[] {"_id", "v1", "v2"});
expected = Arrays.asList("+I[1, one, NULL]", "+I[2, two, NULL]", "+I[3, three, 30]");
waitForResult(expected, table, rowType, primaryKeys);

checkTableSchema(
"[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT NULL\",\"description\":\"primary\"},"
+ "{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(20)\",\"description\":\"v1-new\"},"
+ "{\"id\":2,\"name\":\"v2\",\"type\":\"INT\",\"description\":\"v2\"}]");
}

@Test
@Timeout(90)
public void testAllTypes() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,20 @@ CREATE TABLE schema_evolution_multiple (
PRIMARY KEY (_id)
);

-- ################################################################################
-- MySqlSyncTableActionITCase
-- ################################################################################

CREATE TABLE schema_evolution_comment (
_id INT comment 'primary',
v1 VARCHAR(10) comment 'v1',
PRIMARY KEY (_id)
);

-- ################################################################################
-- testAllTypes
-- ################################################################################

CREATE TABLE all_types_table (
_id INT,
pt DECIMAL(2, 1),
Expand Down

0 comments on commit 32ff204

Please sign in to comment.