diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java index 20740d1a7f2b..47b28f571f8c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java @@ -83,18 +83,20 @@ public static boolean schemaCompatible( for (DataField field : sourceTableFields) { int idx = paimonSchema.fieldNames().indexOf(field.name()); if (idx < 0) { - LOG.info("Cannot find field '{}' in Paimon table.", field.name()); - return false; - } - DataType type = paimonSchema.fields().get(idx).type(); - if (UpdatedDataFieldsProcessFunction.canConvert(field.type(), type) - != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) { LOG.info( - "Cannot convert field '{}' from source table type '{}' to Paimon type '{}'.", - field.name(), - field.type(), - type); - return false; + "New fields '{}' found in source table, will be synchronized to Paimon table.", + field.name()); + } else { + DataType type = paimonSchema.fields().get(idx).type(); + if (UpdatedDataFieldsProcessFunction.canConvert(type, field.type()) + != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) { + LOG.info( + "Cannot convert field '{}' from source table type '{}' to Paimon type '{}'.", + field.name(), + field.type(), + type); + return false; + } } } return true; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java index c674e560b11f..72844d8418ac 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java @@ -27,6 +27,7 @@ import org.apache.paimon.flink.action.cdc.postgres.PostgresRecordParser; import org.apache.paimon.flink.action.cdc.pulsar.PulsarActionUtils; import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; +import org.apache.paimon.schema.TableSchema; import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions; @@ -197,6 +198,16 @@ public FlatMapFunction provideRecordPar List computedColumns, TypeMapping typeMapping, CdcMetadataConverter[] metadataConverters) { + return this.provideRecordParser( + caseSensitive, computedColumns, typeMapping, metadataConverters, null); + } + + public FlatMapFunction provideRecordParser( + boolean caseSensitive, + List computedColumns, + TypeMapping typeMapping, + CdcMetadataConverter[] metadataConverters, + TableSchema paimonSchema) { switch (sourceType) { case MYSQL: return new MySqlRecordParser( @@ -211,7 +222,8 @@ public FlatMapFunction provideRecordPar caseSensitive, computedColumns, typeMapping, - metadataConverters); + metadataConverters, + paimonSchema); case KAFKA: case PULSAR: DataFormat dataFormat = provideDataFormat(); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java index f4e5bfe6abe0..f77940bed390 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java @@ -123,13 +123,13 @@ protected void beforeBuildingSourceSink() throws Exception { // Check if table exists before trying to get or create it if (catalog.tableExists(identifier)) { fileStoreTable = (FileStoreTable) catalog.getTable(identifier); - fileStoreTable = alterTableOptions(identifier, fileStoreTable); try { Schema retrievedSchema = retrieveSchema(); computedColumns = buildComputedColumns(computedColumnArgs, retrievedSchema.fields()); Schema paimonSchema = buildPaimonSchema(retrievedSchema); assertSchemaCompatible(fileStoreTable.schema(), paimonSchema.fields()); + fileStoreTable = alterTableSchema(identifier, fileStoreTable, paimonSchema); } catch (SchemaRetrievalException e) { LOG.info( "Failed to retrieve schema from record data but there exists specified Paimon table. " @@ -157,7 +157,11 @@ protected void beforeBuildingSourceSink() throws Exception { @Override protected FlatMapFunction recordParse() { return syncJobHandler.provideRecordParser( - caseSensitive, computedColumns, typeMapping, metadataConverters); + caseSensitive, + computedColumns, + typeMapping, + metadataConverters, + fileStoreTable.schema()); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index 944185347c35..45da47e1741a 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -27,8 +27,11 @@ import org.apache.paimon.flink.action.cdc.watermark.CdcWatermarkStrategy; import org.apache.paimon.flink.sink.cdc.EventParser; import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; +import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction; import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -79,7 +82,6 @@ public SynchronizationActionBase( this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig); this.syncJobHandler = syncJobHandler; this.caseSensitive = catalog.caseSensitive(); - this.syncJobHandler.registerJdbcDriver(); } @@ -177,7 +179,8 @@ protected abstract void buildSink( DataStream input, EventParser.Factory parserFactory); - protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable table) { + protected FileStoreTable alterTableSchema( + Identifier identifier, FileStoreTable table, Schema paimonSchema) { // doesn't support altering bucket here Map dynamicOptions = new HashMap<>(tableConfig); dynamicOptions.remove(CoreOptions.BUCKET.key()); @@ -194,13 +197,18 @@ protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable oldOptions.get(entry.getKey()), entry.getValue())); // alter the table dynamic options - List optionChanges = + List changes = dynamicOptions.entrySet().stream() .map(entry -> SchemaChange.setOption(entry.getKey(), entry.getValue())) .collect(Collectors.toList()); + List columnChanges = + UpdatedDataFieldsProcessFunction.extractSchemaChanges( + new SchemaManager(table.fileIO(), table.location()), paimonSchema.fields()); + + changes.addAll(columnChanges); try { - catalog.alterTable(identifier, optionChanges, false); + catalog.alterTable(identifier, changes, false); } catch (Catalog.TableNotExistException | Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index 80a7c8b84674..0aec02815c50 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -159,7 +159,7 @@ protected void beforeBuildingSourceSink() throws Exception { Supplier errMsg = incompatibleMessage(table.schema(), tableInfo, identifier); if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) { - table = alterTableOptions(identifier, table); + table = alterTableSchema(identifier, table, fromMySql); tables.add(table); monitoredTables.addAll(tableInfo.identifiers()); } else { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java index 0299d57f7741..f99a8781049f 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java @@ -25,11 +25,14 @@ import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent; import org.apache.paimon.flink.sink.cdc.CdcRecord; import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.types.CharType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VarCharType; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.StringUtils; @@ -70,6 +73,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg; @@ -101,18 +105,21 @@ public class PostgresRecordParser private String currentTable; private String databaseName; private final CdcMetadataConverter[] metadataConverters; + private TableSchema paimonSchema; public PostgresRecordParser( Configuration postgresConfig, boolean caseSensitive, TypeMapping typeMapping, - CdcMetadataConverter[] metadataConverters) { + CdcMetadataConverter[] metadataConverters, + TableSchema schema) { this( postgresConfig, caseSensitive, Collections.emptyList(), typeMapping, - metadataConverters); + metadataConverters, + schema); } public PostgresRecordParser( @@ -120,7 +127,8 @@ public PostgresRecordParser( boolean caseSensitive, List computedColumns, TypeMapping typeMapping, - CdcMetadataConverter[] metadataConverters) { + CdcMetadataConverter[] metadataConverters, + TableSchema paimonSchema) { this.caseSensitive = caseSensitive; this.computedColumns = computedColumns; this.typeMapping = typeMapping; @@ -133,6 +141,7 @@ public PostgresRecordParser( stringifyServerTimeZone == null ? ZoneId.systemDefault() : ZoneId.of(stringifyServerTimeZone); + this.paimonSchema = paimonSchema; } @Override @@ -146,7 +155,7 @@ public void flatMap(CdcSourceRecord rawEvent, Collector extractRecords().forEach(out::collect); } - private List extractFields(DebeziumEvent.Field schema) { + private List extractFields(DebeziumEvent.Field schema, JsonNode afterData) { Map afterFields = schema.afterFields(); Preconditions.checkArgument( !afterFields.isEmpty(), @@ -157,16 +166,22 @@ private List extractFields(DebeziumEvent.Field schema) { RowType.Builder rowType = RowType.builder(); Set existedFields = new HashSet<>(); Function columnDuplicateErrMsg = columnDuplicateErrMsg(currentTable); + + Map paimonFields = + paimonSchema.fields().stream() + .collect(Collectors.toMap(DataField::name, Function.identity())); + afterFields.forEach( - (key, value) -> { + (key, afterField) -> { String columnName = columnCaseConvertAndDuplicateCheck( key, existedFields, caseSensitive, columnDuplicateErrMsg); - DataType dataType = extractFieldType(value); + DataType dataType = + extractFieldType(afterField, paimonFields.get(key), afterData); dataType = dataType.copy( - typeMapping.containsMode(TO_NULLABLE) || value.optional()); + typeMapping.containsMode(TO_NULLABLE) || afterField.optional()); rowType.field(columnName, dataType); }); @@ -177,7 +192,8 @@ private List extractFields(DebeziumEvent.Field schema) { * Extract fields from json records, see postgresql-data-types. */ - private DataType extractFieldType(DebeziumEvent.Field field) { + private DataType extractFieldType( + DebeziumEvent.Field field, DataField paimonField, JsonNode afterData) { switch (field.type()) { case "array": return DataTypes.ARRAY(DataTypes.STRING()); @@ -209,6 +225,16 @@ private DataType extractFieldType(DebeziumEvent.Field field) { case "boolean": return DataTypes.BOOLEAN(); case "string": + int newLength = afterData.get(field.field()).asText().length(); + if (paimonField == null) { + return DataTypes.VARCHAR(newLength); + } else if (paimonField.type() instanceof VarCharType) { + int oldLength = ((VarCharType) paimonField.type()).getLength(); + return DataTypes.VARCHAR(Math.max(oldLength, newLength)); + } else if (paimonField.type() instanceof CharType) { + int oldLength = ((CharType) paimonField.type()).getLength(); + return DataTypes.CHAR(Math.max(oldLength, newLength)); + } return DataTypes.STRING(); case "bytes": if (decimalLogicalName().equals(field.name())) { @@ -248,7 +274,7 @@ private List extractRecords() { Map after = extractRow(root.payload().after()); if (!after.isEmpty()) { after = mapKeyCaseConvert(after, caseSensitive, recordKeyDuplicateErrMsg(after)); - List fields = extractFields(root.schema()); + List fields = extractFields(root.schema(), root.payload().after()); records.add( new RichCdcMultiplexRecord( databaseName, diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java index 8fefcab6319c..44f46427e282 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java @@ -188,7 +188,7 @@ public static ConvertAction canConvert(DataType oldType, DataType newType) { return ConvertAction.EXCEPTION; } - protected List extractSchemaChanges( + public static List extractSchemaChanges( SchemaManager schemaManager, List updatedDataFields) { RowType oldRowType = schemaManager.latest().get().logicalRowType(); Map oldFields = new HashMap<>(); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index fb78d16eb4b2..ff138ec38349 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -1355,8 +1355,6 @@ public void testColumnCommentChangeInExistingTable() throws Exception { mySqlConfig.put("database-name", DATABASE_NAME); mySqlConfig.put("table-name", "test_exist_column_comment_change"); - // Flink cdc 2.3 does not support collecting field comments, and existing paimon table field - // comments will not be changed. MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig) .withPrimaryKeys("pk") @@ -1368,13 +1366,56 @@ public void testColumnCommentChangeInExistingTable() throws Exception { Map actual = table.schema().fields().stream() .collect(Collectors.toMap(DataField::name, Function.identity())); - assertThat(actual.get("pk").description()).isEqualTo("pk comment"); - assertThat(actual.get("c1").description()).isEqualTo("c1 comment"); + assertThat(actual.get("pk").description()).isEqualTo("pk new_comment"); + assertThat(actual.get("c1").description()).isEqualTo("c1 new_comment"); assertThat(actual.get("c2").description()).isEqualTo("c2 comment"); } @Test @Timeout(60) + public void testColumnAlterInExistingTableWhenStartJob() throws Exception { + Map options = new HashMap<>(); + options.put("bucket", "1"); + options.put("sink.parallelism", "1"); + + RowType rowType = + RowType.builder() + .field("pk", DataTypes.INT().notNull()) + .field("a", DataTypes.BIGINT()) + .field("b", DataTypes.VARCHAR(20)) + .build(); + + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("pk"), + Collections.emptyList(), + options); + + Map mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put("database-name", DATABASE_NAME); + mySqlConfig.put("table-name", "test_exist_column_alter"); + + MySqlSyncTableAction action = + syncTableActionBuilder(mySqlConfig) + .withPrimaryKeys("pk") + .withTableConfig(getBasicTableConfig()) + .build(); + + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(); + + Map actual = + table.schema().fields().stream() + .collect(Collectors.toMap(DataField::name, Function.identity())); + + assertThat(actual.get("pk").type()).isEqualTo(DataTypes.INT().notNull()); + assertThat(actual.get("a").type()).isEqualTo(DataTypes.BIGINT()); + assertThat(actual.get("b").type()).isEqualTo(DataTypes.VARCHAR(30)); + assertThat(actual.get("c").type()).isEqualTo(DataTypes.INT()); + } + public void testWriteOnlyAndSchemaEvolution() throws Exception { Map mySqlConfig = getBasicMySqlConfig(); mySqlConfig.put("database-name", "write_only_and_schema_evolution"); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java index be67cd20b662..a68026488e90 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java @@ -20,6 +20,7 @@ import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; @@ -38,6 +39,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; @@ -100,7 +103,9 @@ private void testSchemaEvolutionImpl(Statement statement) throws Exception { RowType rowType = RowType.of( new DataType[] { - DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.STRING() + DataTypes.INT().notNull(), + DataTypes.INT().notNull(), + DataTypes.VARCHAR(10) }, new String[] {"pt", "_id", "v1"}); List primaryKeys = Arrays.asList("pt", "_id"); @@ -118,7 +123,7 @@ private void testSchemaEvolutionImpl(Statement statement) throws Exception { new DataType[] { DataTypes.INT().notNull(), DataTypes.INT().notNull(), - DataTypes.STRING(), + DataTypes.VARCHAR(10), DataTypes.INT() }, new String[] {"pt", "_id", "v1", "v2"}); @@ -145,7 +150,7 @@ private void testSchemaEvolutionImpl(Statement statement) throws Exception { new DataType[] { DataTypes.INT().notNull(), DataTypes.INT().notNull(), - DataTypes.STRING(), + DataTypes.VARCHAR(10), DataTypes.BIGINT() }, new String[] {"pt", "_id", "v1", "v2"}); @@ -170,14 +175,14 @@ private void testSchemaEvolutionImpl(Statement statement) throws Exception { statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v4 BYTEA"); statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v5 FLOAT"); statement.executeUpdate("ALTER TABLE schema_evolution_2 ALTER COLUMN v1 TYPE VARCHAR(20)"); - statement.executeUpdate( - "UPDATE schema_evolution_2 SET v1 = 'very long string' WHERE _id = 8"); + String v1 = "very long string"; + statement.executeUpdate("UPDATE schema_evolution_2 SET v1 = '" + v1 + "' WHERE _id = 8"); rowType = RowType.of( new DataType[] { DataTypes.INT().notNull(), DataTypes.INT().notNull(), - DataTypes.STRING(), + DataTypes.VARCHAR(Math.max(v1.length(), 10)), DataTypes.BIGINT(), DataTypes.DECIMAL(8, 3), DataTypes.BYTES(), @@ -209,7 +214,7 @@ private void testSchemaEvolutionImpl(Statement statement) throws Exception { new DataType[] { DataTypes.INT().notNull(), DataTypes.INT().notNull(), - DataTypes.STRING(), + DataTypes.VARCHAR(Math.max(v1.length(), 10)), DataTypes.BIGINT(), DataTypes.DECIMAL(8, 3), DataTypes.BYTES(), @@ -263,9 +268,9 @@ private void testSchemaEvolutionMultipleImpl(Statement statement) throws Excepti RowType.of( new DataType[] { DataTypes.INT().notNull(), - DataTypes.STRING(), + DataTypes.VARCHAR(10), DataTypes.INT(), - DataTypes.STRING() + DataTypes.VARCHAR(10) }, new String[] {"_id", "v1", "v2", "v3"}); List primaryKeys = Collections.singletonList("_id"); @@ -280,20 +285,30 @@ private void testSchemaEvolutionMultipleImpl(Statement statement) throws Excepti + "ADD COLUMN v6 DECIMAL(5, 3)," + "ADD COLUMN \"$% ^,& *(\" VARCHAR(10)," + "ALTER COLUMN v2 TYPE BIGINT"); + String v1 = "long_string_two"; + String v3 = "string_2"; + String v7 = "test_2"; + statement.executeUpdate( "INSERT INTO schema_evolution_multiple VALUES " - + "(2, 'long_string_two', 2000000000000, 'string_2', 20, 20.5, 20.002, 'test_2')"); + + "(2, '" + + v1 + + "', 2000000000000, '" + + v3 + + "', 20, 20.5, 20.002, '" + + v7 + + "')"); rowType = RowType.of( new DataType[] { DataTypes.INT().notNull(), - DataTypes.STRING(), + DataTypes.VARCHAR(v1.length()), DataTypes.BIGINT(), - DataTypes.STRING(), + DataTypes.VARCHAR(10), DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.DECIMAL(5, 3), - DataTypes.STRING() + DataTypes.VARCHAR(v7.length()) }, new String[] {"_id", "v1", "v2", "v3", "v4", "v5", "v6", "$% ^,& *("}); expected = @@ -362,8 +377,8 @@ private void testAllTypesImpl(Statement statement) throws Exception { DataTypes.TIMESTAMP(6), // _timestamp0 DataTypes.TIME(6), // _time DataTypes.TIME(6), // _time0 - DataTypes.STRING(), // _char - DataTypes.STRING(), // _varchar + DataTypes.CHAR(10), // _char + DataTypes.VARCHAR(20), // _varchar DataTypes.STRING(), // _text DataTypes.BYTES(), // _bin DataTypes.STRING(), // _json @@ -660,7 +675,7 @@ public void testSyncShards() throws Exception { RowType.of( new DataType[] { DataTypes.INT().notNull(), - DataTypes.STRING(), + DataTypes.VARCHAR(10), DataTypes.STRING().notNull() }, new String[] {"pk", "_date", "pt"}); @@ -753,7 +768,7 @@ public void testMetadataColumns() throws Exception { RowType.of( new DataType[] { DataTypes.INT().notNull(), - DataTypes.STRING(), + DataTypes.VARCHAR(10), DataTypes.STRING().notNull(), DataTypes.STRING().notNull(), DataTypes.STRING().notNull() @@ -785,6 +800,50 @@ public void testCatalogAndTableConfig() { .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value")); } + @Test + @Timeout(60) + public void testColumnAlterInExistingTableWhenStartJob() throws Exception { + String tableName = "test_exist_column_alter"; + Map options = new HashMap<>(); + options.put("bucket", "1"); + options.put("sink.parallelism", "1"); + + RowType rowType = + RowType.builder() + .field("pk", DataTypes.INT().notNull()) + .field("a", DataTypes.BIGINT()) + .field("b", DataTypes.VARCHAR(20)) + .build(); + + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("pk"), + Collections.emptyList(), + options); + + Map postgresConfig = getBasicPostgresConfig(); + postgresConfig.put(PostgresSourceOptions.DATABASE_NAME.key(), DATABASE_NAME); + postgresConfig.put(PostgresSourceOptions.SCHEMA_NAME.key(), SCHEMA_NAME); + postgresConfig.put(PostgresSourceOptions.TABLE_NAME.key(), tableName); + + PostgresSyncTableAction action = + syncTableActionBuilder(postgresConfig).withPrimaryKeys("pk").build(); + + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(); + + Map actual = + table.schema().fields().stream() + .collect(Collectors.toMap(DataField::name, Function.identity())); + + assertThat(actual.get("pk").type()).isEqualTo(DataTypes.INT().notNull()); + assertThat(actual.get("a").type()).isEqualTo(DataTypes.BIGINT()); + assertThat(actual.get("b").type()).isEqualTo(DataTypes.VARCHAR(30)); + assertThat(actual.get("c").type()).isEqualTo(DataTypes.INT()); + } + private FileStoreTable getFileStoreTable() throws Exception { return getFileStoreTable(tableName); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql index 676185fb9291..87db716ca127 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql @@ -317,12 +317,21 @@ CREATE TABLE test_exist_options_change ( ); CREATE TABLE test_exist_column_comment_change ( - pk INT, - c1 DATE, + pk INT comment 'pk new_comment', + c1 DATE comment 'c1 new_comment', c2 VARCHAR(10) not null comment 'c2 comment', PRIMARY KEY (pk) ); +CREATE TABLE test_exist_column_alter ( + pk INT, + a BIGINT, + b VARCHAR(30), + c INT, + PRIMARY KEY (pk) +); + + -- ################################################################################ -- testSyncShard -- ################################################################################ diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_table_setup.sql index 373eb3880f40..a451cb1c7b6a 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_table_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_table_setup.sql @@ -229,6 +229,15 @@ CREATE TABLE test_options_change ( ALTER TABLE test_options_change REPLICA IDENTITY FULL; +CREATE TABLE test_exist_column_alter ( + pk INT, + a BIGINT, + b VARCHAR(30), + c INT, + PRIMARY KEY (pk) +); + + -- ################################################################################ -- testMetadataColumns -- ################################################################################