diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index c2081f304ffa4..4e039c7ff24b1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -188,7 +188,8 @@ public TableSchema commitChanges(String branchName, List changes) Catalog.ColumnNotExistException { while (true) { TableSchema schema = - latest().orElseThrow( + latest(branchName) + .orElseThrow( () -> new Catalog.TableNotExistException( fromPath(tableRoot.toString(), true))); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java index 7a621030217a6..3b79265c8544c 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java @@ -34,6 +34,7 @@ import org.apache.paimon.table.source.TableScan; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.restartstrategy.RestartStrategies; @@ -112,6 +113,16 @@ protected void assertTableNotExists(String... tableNames) throws Exception { protected void waitForResult( List expected, FileStoreTable table, RowType rowType, List primaryKeys) throws Exception { + waitForResult(expected, table, rowType, primaryKeys, BranchManager.DEFAULT_MAIN_BRANCH); + } + + protected void waitForResult( + List expected, + FileStoreTable table, + RowType rowType, + List primaryKeys, + String branch) + throws Exception { assertThat(table.schema().primaryKeys()).isEqualTo(primaryKeys); // wait for table schema to become our expected schema @@ -130,7 +141,7 @@ protected void waitForResult( break; } } - table = table.copyWithLatestSchema(); + table = table.copyWithLatestSchema(branch); Thread.sleep(1000); } @@ -138,7 +149,7 @@ protected void waitForResult( List sortedExpected = new ArrayList<>(expected); Collections.sort(sortedExpected); while (true) { - ReadBuilder readBuilder = table.newReadBuilder(); + ReadBuilder readBuilder = table.newReadBuilder().fromBranch(branch); TableScan.Plan plan = readBuilder.newScan().plan(); List result = getResult( diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java index 7aba174d3dfcf..83c729ffd0899 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java @@ -32,6 +32,12 @@ public void testSchemaEvolution() throws Exception { runSingleTableSchemaEvolution("schemaevolution", DEBEZIUM); } + @Test + @Timeout(120) + public void testSchemaEvolutionToBranch() throws Exception { + runSingleTableSchemaEvolutionToBranch("schemaevolution", DEBEZIUM, "testBranch"); + } + @Test @Timeout(60) public void testNotSupportFormat() throws Exception { diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java index f2f8fc246682d..39a03f52e8fa1 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.action.cdc.kafka; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils; import org.apache.paimon.flink.action.cdc.TypeMapping; import org.apache.paimon.schema.Schema; @@ -33,6 +34,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -81,6 +83,115 @@ protected void runSingleTableSchemaEvolution(String sourceDir, String format) th testSchemaEvolutionImpl(topic, sourceDir, format); } + protected void runSingleTableSchemaEvolutionToBranch( + String sourceDir, String format, String branch) throws Exception { + final String topic = "schema_evolution-branch"; + createTestTopic(topic, 1, 1); + // ---------- Write the data into Kafka ------------------- + List lines = + readLines( + String.format( + "kafka/%s/table/%s/%s-data-1.txt", format, sourceDir, format)); + try { + writeRecordsToKafka(topic, lines); + } catch (Exception e) { + throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); + } + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); + kafkaConfig.put(TOPIC.key(), topic); + Map tableConfig = getBasicTableConfig(); + tableConfig.put(FlinkConnectorOptions.BRANCH.key(), branch); + Map catalogConfig = new HashMap<>(); + catalogConfig.put(FlinkConnectorOptions.BRANCH.key(), branch); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig) + .withPrimaryKeys("id") + .withTableConfig(tableConfig) + .withCatalogConfig(catalogConfig) + .build(); + runActionWithDefaultEnv(action); + + testSchemaEvolutionImplWithBranch(topic, sourceDir, format, branch); + } + + private void testSchemaEvolutionImplWithBranch( + String topic, String sourceDir, String format, String branch) throws Exception { + FileStoreTable table = getFileStoreTableWithBranch(tableName, branch); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys = Collections.singletonList("id"); + List expected = + Arrays.asList( + "+I[101, scooter, Small 2-wheel scooter, 3.14]", + "+I[102, car battery, 12V car battery, 8.1]"); + waitForResult(expected, table, rowType, primaryKeys, branch); + + try { + writeRecordsToKafka( + topic, + readLines( + String.format( + "kafka/%s/table/%s/%s-data-2.txt", format, sourceDir, format))); + } catch (Exception e) { + throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); + } + rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight", "age"}); + expected = + Arrays.asList( + "+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]", + "+I[102, car battery, 12V car battery, 8.1, NULL]", + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18]", + "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]"); + waitForResult(expected, table, rowType, primaryKeys, branch); + + try { + writeRecordsToKafka( + topic, + readLines( + String.format( + "kafka/%s/table/%s/%s-data-3.txt", format, sourceDir, format))); + } catch (Exception e) { + throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); + } + rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight", "age", "address"}); + expected = + Arrays.asList( + "+I[102, car battery, 12V car battery, 8.1, NULL, NULL]", + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18, NULL]", + "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24, NULL]", + "+I[105, hammer, 14oz carpenter's hammer, 0.875, NULL, Beijing]", + "+I[107, rocks, box of assorted rocks, 5.3, NULL, NULL]"); + waitForResult(expected, table, rowType, primaryKeys, branch); + } + private void testSchemaEvolutionImpl(String topic, String sourceDir, String format) throws Exception { FileStoreTable table = getFileStoreTable(tableName); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index aee4c484f27dc..ce9869f60b8f4 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.action.cdc.mysql; import org.apache.paimon.catalog.FileSystemCatalogOptions; +import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; @@ -93,6 +94,50 @@ public void testSchemaEvolution() throws Exception { } } + @Test + @Timeout(120) + public void testSchemaEvolutionToBranch() throws Exception { + String branch = "testBranch"; + + Map mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put("database-name", DATABASE_NAME); + mySqlConfig.put("table-name", "schema_evolution_\\d+"); + + Map tableConfig = getBasicTableConfig(); + tableConfig.put(FlinkConnectorOptions.BRANCH.key(), branch); + Map catalogConfig = new HashMap<>(); + catalogConfig.put(FlinkConnectorOptions.BRANCH.key(), branch); + + MySqlSyncTableAction action = + syncTableActionBuilder(mySqlConfig) + .withCatalogConfig( + Collections.singletonMap( + CatalogOptions.METASTORE.key(), "test-alter-table")) + .withTableConfig(tableConfig) + .withCatalogConfig(catalogConfig) + .withPartitionKeys("pt") + .withPrimaryKeys("pt", "_id") + .build(); + runActionWithDefaultEnv(action); + + checkTableSchemaWithBranch( + "[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT NULL\",\"description\":\"primary\"}," + + "{\"id\":1,\"name\":\"_id\",\"type\":\"INT NOT NULL\",\"description\":\"_id\"}," + + "{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"}]", + branch); + + try (Statement statement = getStatement()) { + testSchemaEvolutionImplToBranch(statement, branch); + } + } + + private void checkTableSchemaWithBranch(String excepted, String branch) throws Exception { + + FileStoreTable table = getFileStoreTableWithBranch(tableName, branch); + + assertThat(JsonSerdeUtil.toFlatJson(table.schema().fields())).isEqualTo(excepted); + } + private void checkTableSchema(String excepted) throws Exception { FileStoreTable table = getFileStoreTable(); @@ -244,6 +289,148 @@ private void testSchemaEvolutionImpl(Statement statement) throws Exception { assertThat(getFileStoreTable().options()).containsEntry("alter-table-test", "true"); } + private void testSchemaEvolutionImplToBranch(Statement statement, String branch) + throws Exception { + FileStoreTable table = getFileStoreTableWithBranch(tableName, branch); + statement.executeUpdate("USE " + DATABASE_NAME); + + statement.executeUpdate("INSERT INTO schema_evolution_1 VALUES (1, 1, 'one')"); + statement.executeUpdate( + "INSERT INTO schema_evolution_2 VALUES (1, 2, 'two'), (2, 4, 'four')"); + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.INT().notNull(), + DataTypes.VARCHAR(10) + }, + new String[] {"pt", "_id", "v1"}); + List primaryKeys = Arrays.asList("pt", "_id"); + List expected = Arrays.asList("+I[1, 1, one]", "+I[1, 2, two]", "+I[2, 4, four]"); + waitForResult(expected, table, rowType, primaryKeys, branch); + + statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v2 INT"); + statement.executeUpdate( + "INSERT INTO schema_evolution_1 VALUES (2, 3, 'three', 30), (1, 5, 'five', 50)"); + statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v2 INT"); + statement.executeUpdate("INSERT INTO schema_evolution_2 VALUES (1, 6, 'six', 60)"); + statement.executeUpdate("UPDATE schema_evolution_2 SET v1 = 'second' WHERE _id = 2"); + rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.INT().notNull(), + DataTypes.VARCHAR(10), + DataTypes.INT() + }, + new String[] {"pt", "_id", "v1", "v2"}); + expected = + Arrays.asList( + "+I[1, 1, one, NULL]", + "+I[1, 2, second, NULL]", + "+I[2, 3, three, 30]", + "+I[2, 4, four, NULL]", + "+I[1, 5, five, 50]", + "+I[1, 6, six, 60]"); + waitForResult(expected, table, rowType, primaryKeys, branch); + + statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN v2 BIGINT"); + statement.executeUpdate( + "INSERT INTO schema_evolution_1 VALUES (2, 7, 'seven', 70000000000)"); + statement.executeUpdate("DELETE FROM schema_evolution_1 WHERE _id = 5"); + statement.executeUpdate("UPDATE schema_evolution_1 SET v2 = 30000000000 WHERE _id = 3"); + statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN v2 BIGINT"); + statement.executeUpdate( + "INSERT INTO schema_evolution_2 VALUES (2, 8, 'eight', 80000000000)"); + rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.INT().notNull(), + DataTypes.VARCHAR(10), + DataTypes.BIGINT() + }, + new String[] {"pt", "_id", "v1", "v2"}); + expected = + Arrays.asList( + "+I[1, 1, one, NULL]", + "+I[1, 2, second, NULL]", + "+I[2, 3, three, 30000000000]", + "+I[2, 4, four, NULL]", + "+I[1, 6, six, 60]", + "+I[2, 7, seven, 70000000000]", + "+I[2, 8, eight, 80000000000]"); + waitForResult(expected, table, rowType, primaryKeys, branch); + + statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v3 NUMERIC(8, 3)"); + statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v4 VARBINARY(10)"); + statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v5 FLOAT"); + statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN v1 VARCHAR(20)"); + statement.executeUpdate( + "INSERT INTO schema_evolution_1 VALUES (1, 9, 'nine', 90000000000, 99999.999, 'nine.bin', 9.9)"); + statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v3 NUMERIC(8, 3)"); + statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v4 VARBINARY(10)"); + statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v5 FLOAT"); + statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN v1 VARCHAR(20)"); + statement.executeUpdate( + "UPDATE schema_evolution_2 SET v1 = 'very long string' WHERE _id = 8"); + rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.INT().notNull(), + DataTypes.VARCHAR(20), + DataTypes.BIGINT(), + DataTypes.DECIMAL(8, 3), + DataTypes.VARBINARY(10), + DataTypes.FLOAT() + }, + new String[] {"pt", "_id", "v1", "v2", "v3", "v4", "v5"}); + expected = + Arrays.asList( + "+I[1, 1, one, NULL, NULL, NULL, NULL]", + "+I[1, 2, second, NULL, NULL, NULL, NULL]", + "+I[2, 3, three, 30000000000, NULL, NULL, NULL]", + "+I[2, 4, four, NULL, NULL, NULL, NULL]", + "+I[1, 6, six, 60, NULL, NULL, NULL]", + "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]", + "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]", + "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]"); + waitForResult(expected, table, rowType, primaryKeys, branch); + + statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN v4 VARBINARY(20)"); + statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN v5 DOUBLE"); + statement.executeUpdate( + "UPDATE schema_evolution_1 SET v4 = 'nine.bin.long', v5 = 9.00000000009 WHERE _id = 9"); + statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN v4 VARBINARY(20)"); + statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN v5 DOUBLE"); + statement.executeUpdate( + "UPDATE schema_evolution_2 SET v4 = 'four.bin.long', v5 = 4.00000000004 WHERE _id = 4"); + rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.INT().notNull(), + DataTypes.VARCHAR(20), + DataTypes.BIGINT(), + DataTypes.DECIMAL(8, 3), + DataTypes.VARBINARY(20), + DataTypes.DOUBLE() + }, + new String[] {"pt", "_id", "v1", "v2", "v3", "v4", "v5"}); + expected = + Arrays.asList( + "+I[1, 1, one, NULL, NULL, NULL, NULL]", + "+I[1, 2, second, NULL, NULL, NULL, NULL]", + "+I[2, 3, three, 30000000000, NULL, NULL, NULL]", + "+I[2, 4, four, NULL, NULL, [102, 111, 117, 114, 46, 98, 105, 110, 46, 108, 111, 110, 103], 4.00000000004]", + "+I[1, 6, six, 60, NULL, NULL, NULL]", + "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]", + "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]", + "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110, 46, 108, 111, 110, 103], 9.00000000009]"); + waitForResult(expected, table, rowType, primaryKeys, branch); + } + @Test @Timeout(60) public void testMultipleSchemaEvolutions() throws Exception { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java index 80cf2cc767119..1c52c9d922dbd 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java @@ -27,6 +27,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.FileStoreTable; @@ -57,6 +58,9 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; +import static org.apache.paimon.options.CatalogOptions.BRANCH; +import static org.apache.paimon.options.CatalogOptions.WAREHOUSE; + /** {@link Action} test base. */ public abstract class ActionITCaseBase extends AbstractTestBase { @@ -121,6 +125,17 @@ protected FileStoreTable createFileStoreTable( return (FileStoreTable) catalog.getTable(identifier); } + protected FileStoreTable getFileStoreTableWithBranch(String tableName, String branch) + throws Exception { + Map options = new HashMap<>(); + options.put(WAREHOUSE.key(), new Path(warehouse).toUri().toString()); + options.put(BRANCH.key(), branch); + Catalog catalogBranch = + CatalogFactory.createCatalog(CatalogContext.create(Options.fromMap(options))); + Identifier identifier = Identifier.create(database, tableName); + return (FileStoreTable) catalogBranch.getTable(identifier); + } + protected FileStoreTable getFileStoreTable(String tableName) throws Exception { Identifier identifier = Identifier.create(database, tableName); return (FileStoreTable) catalog.getTable(identifier);