From c70a3de3c359d1a7e9b13ff56bb43282e9615fec Mon Sep 17 00:00:00 2001 From: MOBIN-F <18814118038@163.com> Date: Wed, 10 Jul 2024 22:41:58 +0800 Subject: [PATCH 1/3] supported_schemaEvolution_when_restarting_the_paimon_cdc_job --- .../action/cdc/CdcActionCommonUtils.java | 29 +++--- .../flink/action/cdc/SyncTableActionBase.java | 2 +- .../action/cdc/SynchronizationActionBase.java | 15 +++- .../cdc/mysql/MySqlSyncDatabaseAction.java | 2 +- .../cdc/postgres/PostgresTypeUtils.java | 10 +-- .../UpdatedDataFieldsProcessFunctionBase.java | 7 +- .../cdc/mysql/MySqlSyncTableActionITCase.java | 89 ++++++++++++++++++- .../PostgresSyncTableActionITCase.java | 53 ++++++++++- .../test/resources/mysql/sync_table_setup.sql | 22 ++++- .../resources/postgres/sync_table_setup.sql | 9 ++ 10 files changed, 204 insertions(+), 34 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java index 3e85c7c88bf8..e2052162c539 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java @@ -81,18 +81,23 @@ public static boolean schemaCompatible( for (DataField field : sourceTableFields) { int idx = paimonSchema.fieldNames().indexOf(field.name()); if (idx < 0) { - LOG.info("Cannot find field '{}' in Paimon table.", field.name()); - return false; - } - DataType type = paimonSchema.fields().get(idx).type(); - if (UpdatedDataFieldsProcessFunction.canConvert(field.type(), type) - != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) { - LOG.info( - "Cannot convert field '{}' from source table type '{}' to Paimon type '{}'.", - field.name(), - field.type(), - type); - return false; + if (!field.type().isNullable()) { + LOG.info( + "Add column '{}' cannot specify NOT NULL in the Paimon table.", + field.name()); + return false; + } + } else { + DataType type = paimonSchema.fields().get(idx).type(); + if (UpdatedDataFieldsProcessFunction.canConvert(type, field.type()) + != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) { + LOG.info( + "Cannot convert field '{}' from source table type '{}' to Paimon type '{}'.", + field.name(), + field.type(), + type); + return false; + } } } return true; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java index e335fc2be348..d25cb2b8eb38 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java @@ -124,13 +124,13 @@ protected void beforeBuildingSourceSink() throws Exception { // Check if table exists before trying to get or create it if (catalog.tableExists(identifier)) { fileStoreTable = (FileStoreTable) catalog.getTable(identifier); - fileStoreTable = alterTableOptions(identifier, fileStoreTable); try { Schema retrievedSchema = retrieveSchema(); computedColumns = buildComputedColumns(computedColumnArgs, retrievedSchema.fields()); Schema paimonSchema = buildPaimonSchema(retrievedSchema); assertSchemaCompatible(fileStoreTable.schema(), paimonSchema.fields()); + fileStoreTable = alterTableSchema(identifier, fileStoreTable, paimonSchema); } catch (SchemaRetrievalException e) { LOG.info( "Failed to retrieve schema from record data but there exists specified Paimon table. " diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index 944185347c35..9f99ad35e67c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -27,8 +27,11 @@ import org.apache.paimon.flink.action.cdc.watermark.CdcWatermarkStrategy; import org.apache.paimon.flink.sink.cdc.EventParser; import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; +import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction; import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -79,7 +82,6 @@ public SynchronizationActionBase( this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig); this.syncJobHandler = syncJobHandler; this.caseSensitive = catalog.caseSensitive(); - this.syncJobHandler.registerJdbcDriver(); } @@ -177,7 +179,8 @@ protected abstract void buildSink( DataStream input, EventParser.Factory parserFactory); - protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable table) { + protected FileStoreTable alterTableSchema( + Identifier identifier, FileStoreTable table, Schema paimonSchema) { // doesn't support altering bucket here Map dynamicOptions = new HashMap<>(tableConfig); dynamicOptions.remove(CoreOptions.BUCKET.key()); @@ -199,6 +202,14 @@ protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable .map(entry -> SchemaChange.setOption(entry.getKey(), entry.getValue())) .collect(Collectors.toList()); + // alter the table schema + List columnChanges = + UpdatedDataFieldsProcessFunction.extractSchemaChanges( + new SchemaManager(table.fileIO(), table.location()), + paimonSchema.fields(), + caseSensitive); + + optionChanges.addAll(columnChanges); try { catalog.alterTable(identifier, optionChanges, false); } catch (Catalog.TableNotExistException diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index a33f2c978321..2059293385fb 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -160,7 +160,7 @@ protected void beforeBuildingSourceSink() throws Exception { Supplier errMsg = incompatibleMessage(table.schema(), tableInfo, identifier); if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) { - table = alterTableOptions(identifier, table); + table = alterTableSchema(identifier, table, fromMySql); tables.add(table); monitoredTables.addAll(tableInfo.identifiers()); } else { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresTypeUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresTypeUtils.java index 1f3ca3035975..d677b0e481d1 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresTypeUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresTypeUtils.java @@ -136,18 +136,14 @@ public static DataType toDataType( return DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)); case PG_CHAR: case PG_CHARACTER: - return DataTypes.CHAR(precision); - case PG_CHAR_ARRAY: - case PG_CHARACTER_ARRAY: - return DataTypes.ARRAY(DataTypes.CHAR(precision)); case PG_CHARACTER_VARYING: - return DataTypes.VARCHAR(precision); - case PG_CHARACTER_VARYING_ARRAY: - return DataTypes.ARRAY(DataTypes.VARCHAR(precision)); case PG_TEXT: case PG_JSON: case PG_ENUM: return DataTypes.STRING(); + case PG_CHAR_ARRAY: + case PG_CHARACTER_ARRAY: + case PG_CHARACTER_VARYING_ARRAY: case PG_TEXT_ARRAY: return DataTypes.ARRAY(DataTypes.STRING()); case PG_TIMESTAMP: diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java index 3d832d33949b..7cf5fb2aea3c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java @@ -192,8 +192,13 @@ public static ConvertAction canConvert(DataType oldType, DataType newType) { return ConvertAction.EXCEPTION; } - protected List extractSchemaChanges( + public List extractSchemaChanges( SchemaManager schemaManager, List updatedDataFields) { + return extractSchemaChanges(schemaManager, updatedDataFields, caseSensitive); + } + + public static List extractSchemaChanges( + SchemaManager schemaManager, List updatedDataFields, boolean caseSensitive) { RowType oldRowType = schemaManager.latest().get().logicalRowType(); Map oldFields = new HashMap<>(); for (DataField oldField : oldRowType.getFields()) { diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index 9568b1c3b7fa..b9fd80cd842b 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -1361,8 +1361,6 @@ public void testColumnCommentChangeInExistingTable() throws Exception { mySqlConfig.put("database-name", DATABASE_NAME); mySqlConfig.put("table-name", "test_exist_column_comment_change"); - // Flink cdc 2.3 does not support collecting field comments, and existing paimon table field - // comments will not be changed. MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig) .withPrimaryKeys("pk") @@ -1374,13 +1372,96 @@ public void testColumnCommentChangeInExistingTable() throws Exception { Map actual = table.schema().fields().stream() .collect(Collectors.toMap(DataField::name, Function.identity())); - assertThat(actual.get("pk").description()).isEqualTo("pk comment"); - assertThat(actual.get("c1").description()).isEqualTo("c1 comment"); + assertThat(actual.get("pk").description()).isEqualTo("pk new_comment"); + assertThat(actual.get("c1").description()).isEqualTo("c1 new_comment"); assertThat(actual.get("c2").description()).isEqualTo("c2 comment"); } @Test @Timeout(60) + public void testColumnAlterInExistingTableBeforeStartJob() throws Exception { + Map options = new HashMap<>(); + options.put("bucket", "1"); + options.put("sink.parallelism", "1"); + + RowType rowType = + RowType.builder() + .field("pk", DataTypes.INT().notNull()) + .field("a", DataTypes.BIGINT()) + .field("b", DataTypes.VARCHAR(20)) + .build(); + + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("pk"), + Collections.emptyList(), + options); + + Map mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put("database-name", DATABASE_NAME); + mySqlConfig.put("table-name", "test_exist_column_alter"); + + MySqlSyncTableAction action = + syncTableActionBuilder(mySqlConfig) + .withPrimaryKeys("pk") + .withTableConfig(getBasicTableConfig()) + .build(); + + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(); + + Map actual = + table.schema().fields().stream() + .collect(Collectors.toMap(DataField::name, Function.identity())); + + assertThat(actual.get("pk").type()).isEqualTo(DataTypes.INT().notNull()); + assertThat(actual.get("a").type()).isEqualTo(DataTypes.BIGINT()); + assertThat(actual.get("b").type()).isEqualTo(DataTypes.VARCHAR(30)); + assertThat(actual.get("c").type()).isEqualTo(DataTypes.INT()); + } + + @Test + @Timeout(60) + public void testAssertSchemaCompatibleWithAddColumnISNOTNULL() throws Exception { + Map options = new HashMap<>(); + options.put("bucket", "1"); + options.put("sink.parallelism", "1"); + + RowType rowType = + RowType.builder() + .field("pk", DataTypes.INT().notNull()) + .field("a", DataTypes.BIGINT()) + .field("b", DataTypes.VARCHAR(20)) + .build(); + + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("pk"), + Collections.emptyList(), + options); + + Map mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put("database-name", DATABASE_NAME); + mySqlConfig.put("table-name", "assert_schema_compatible"); + + MySqlSyncTableAction action = + syncTableActionBuilder(mySqlConfig) + .withPrimaryKeys("pk") + .withTableConfig(getBasicTableConfig()) + .build(); + + assertThatThrownBy(action::run) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Paimon schema and source table schema are not compatible.\n" + + "Paimon fields are: [`pk` INT NOT NULL, `a` BIGINT, `b` VARCHAR(20)].\n" + + "Source table fields are: [`pk` INT NOT NULL '', `a` BIGINT '', `b` VARCHAR(30) '', `c` INT NOT NULL 'Add column cannot specify NOT NULL in the Paimon table']")); + } + public void testWriteOnlyAndSchemaEvolution() throws Exception { Map mySqlConfig = getBasicMySqlConfig(); mySqlConfig.put("database-name", "write_only_and_schema_evolution"); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java index b5b36888ebe0..8dd9bb7d92fa 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java @@ -20,6 +20,7 @@ import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; @@ -38,6 +39,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; @@ -77,7 +80,7 @@ public void testSchemaEvolution() throws Exception { checkTableSchema( "[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT NULL\"}," + "{\"id\":1,\"name\":\"_id\",\"type\":\"INT NOT NULL\"}," - + "{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\"}]"); + + "{\"id\":2,\"name\":\"v1\",\"type\":\"STRING\"}]"); try (Statement statement = getStatement(DATABASE_NAME)) { testSchemaEvolutionImpl(statement); @@ -245,9 +248,9 @@ public void testMultipleSchemaEvolutions() throws Exception { checkTableSchema( "[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT NULL\"}," - + "{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(10)\"}," + + "{\"id\":1,\"name\":\"v1\",\"type\":\"STRING\"}," + "{\"id\":2,\"name\":\"v2\",\"type\":\"INT\"}," - + "{\"id\":3,\"name\":\"v3\",\"type\":\"VARCHAR(10)\"}]"); + + "{\"id\":3,\"name\":\"v3\",\"type\":\"STRING\"}]"); try (Statement statement = getStatement(DATABASE_NAME)) { testSchemaEvolutionMultipleImpl(statement); @@ -786,6 +789,50 @@ public void testCatalogAndTableConfig() { .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value")); } + @Test + @Timeout(60) + public void testColumnAlterInExistingTableWhenStartJob() throws Exception { + String tableName = "test_exist_column_alter"; + Map options = new HashMap<>(); + options.put("bucket", "1"); + options.put("sink.parallelism", "1"); + + RowType rowType = + RowType.builder() + .field("pk", DataTypes.INT().notNull()) + .field("a", DataTypes.BIGINT()) + .field("b", DataTypes.STRING()) + .build(); + + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("pk"), + Collections.emptyList(), + options); + + Map postgresConfig = getBasicPostgresConfig(); + postgresConfig.put(PostgresSourceOptions.DATABASE_NAME.key(), DATABASE_NAME); + postgresConfig.put(PostgresSourceOptions.SCHEMA_NAME.key(), SCHEMA_NAME); + postgresConfig.put(PostgresSourceOptions.TABLE_NAME.key(), tableName); + + PostgresSyncTableAction action = + syncTableActionBuilder(postgresConfig).withPrimaryKeys("pk").build(); + + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(); + + Map actual = + table.schema().fields().stream() + .collect(Collectors.toMap(DataField::name, Function.identity())); + + assertThat(actual.get("pk").type()).isEqualTo(DataTypes.INT().notNull()); + assertThat(actual.get("a").type()).isEqualTo(DataTypes.BIGINT()); + assertThat(actual.get("b").type()).isEqualTo(DataTypes.STRING()); + assertThat(actual.get("c").type()).isEqualTo(DataTypes.INT()); + } + private FileStoreTable getFileStoreTable() throws Exception { return getFileStoreTable(tableName); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql index 676185fb9291..c073eea3a37f 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql @@ -317,12 +317,28 @@ CREATE TABLE test_exist_options_change ( ); CREATE TABLE test_exist_column_comment_change ( - pk INT, - c1 DATE, - c2 VARCHAR(10) not null comment 'c2 comment', + pk INT comment 'pk new_comment', + c1 DATE comment 'c1 new_comment', + c2 VARCHAR(10) NOT NULL comment 'c2 comment', PRIMARY KEY (pk) ); +CREATE TABLE test_exist_column_alter ( + pk INT, + a BIGINT, + b VARCHAR(30), + c INT, + PRIMARY KEY (pk) +); + +CREATE TABLE assert_schema_compatible ( + pk INT, + a BIGINT, + b VARCHAR(30), + c INT NOT NULL comment 'Add column cannot specify NOT NULL in the Paimon table', + PRIMARY KEY (pk) +); + -- ################################################################################ -- testSyncShard -- ################################################################################ diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_table_setup.sql index 373eb3880f40..a451cb1c7b6a 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_table_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_table_setup.sql @@ -229,6 +229,15 @@ CREATE TABLE test_options_change ( ALTER TABLE test_options_change REPLICA IDENTITY FULL; +CREATE TABLE test_exist_column_alter ( + pk INT, + a BIGINT, + b VARCHAR(30), + c INT, + PRIMARY KEY (pk) +); + + -- ################################################################################ -- testMetadataColumns -- ################################################################################ From da515fd14841f113e344afdb634439e3aaa93aa8 Mon Sep 17 00:00:00 2001 From: MOBIN-F <18814118038@163.com> Date: Wed, 11 Sep 2024 15:35:03 +0800 Subject: [PATCH 2/3] resolved conflicts --- .../paimon/flink/action/cdc/SynchronizationActionBase.java | 2 +- .../sink/cdc/UpdatedDataFieldsProcessFunctionBase.java | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index c5e385481e30..aa436ca4b294 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -213,7 +213,7 @@ protected FileStoreTable alterTableSchema( UpdatedDataFieldsProcessFunction.extractSchemaChanges( new SchemaManager(table.fileIO(), table.location()), paimonSchema.fields(), - caseSensitive); + allowUpperCase); optionChanges.addAll(columnChanges); try { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java index 6d16f41848cf..7d0ee9edd532 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java @@ -194,11 +194,13 @@ public static ConvertAction canConvert(DataType oldType, DataType newType) { public List extractSchemaChanges( SchemaManager schemaManager, List updatedDataFields) { - return extractSchemaChanges(schemaManager, updatedDataFields, caseSensitive); + return extractSchemaChanges(schemaManager, updatedDataFields, allowUpperCase); } public static List extractSchemaChanges( - SchemaManager schemaManager, List updatedDataFields, boolean caseSensitive) { + SchemaManager schemaManager, + List updatedDataFields, + boolean allowUpperCase) { RowType oldRowType = schemaManager.latest().get().logicalRowType(); Map oldFields = new HashMap<>(); for (DataField oldField : oldRowType.getFields()) { From 30d50594a9539e4e1e62e12b37c23b98fc478f8d Mon Sep 17 00:00:00 2001 From: MOBIN-F <18814118038@163.com> Date: Wed, 25 Sep 2024 18:09:39 +0800 Subject: [PATCH 3/3] fix --- .../action/cdc/SynchronizationActionBase.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index 1b7a576a6f8f..0002b5df10e1 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -44,6 +44,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.time.Duration; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -201,10 +202,7 @@ protected FileStoreTable alterTableSchema( immutableOptionKeys.contains(entry.getKey()) || Objects.equals( oldOptions.get(entry.getKey()), entry.getValue())); - - if (dynamicOptions.isEmpty()) { - return table; - } + List tableSchemaChanges = new ArrayList<>(); // alter the table dynamic options List optionChanges = @@ -219,9 +217,15 @@ protected FileStoreTable alterTableSchema( paimonSchema.fields(), allowUpperCase); - optionChanges.addAll(columnChanges); + tableSchemaChanges.addAll(optionChanges); + tableSchemaChanges.addAll(columnChanges); + + if (tableSchemaChanges.isEmpty()) { + return table; + } + try { - catalog.alterTable(identifier, optionChanges, false); + catalog.alterTable(identifier, tableSchemaChanges, false); } catch (Catalog.TableNotExistException | Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) {