diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 5f8e58537e64f..bbd28cb93230d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -79,13 +79,9 @@ public Path snapshotDirectory() {
     }
 
     public Path snapshotDirectory(String branchName) {
-        return new Path(getBranchPath(tablePath, branchName) + "/snapshot");
-    }
-
-    public Path snapshotDirectoryByBranch(String branchName) {
         return branchName.equals(DEFAULT_MAIN_BRANCH)
                 ? snapshotDirectory()
-                : snapshotDirectory(branchName);
+                : new Path(getBranchPath(tablePath, branchName) + "/snapshot");
     }
 
     public Path snapshotPath(long snapshotId) {
@@ -93,14 +89,13 @@ public Path snapshotPath(long snapshotId) {
     }
 
     public Path snapshotPath(String branchName, long snapshotId) {
-        return new Path(
-                getBranchPath(tablePath, branchName) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId);
-    }
-
-    public Path snapshotPathByBranch(String branchName, long snapshotId) {
         return branchName.equals(DEFAULT_MAIN_BRANCH)
                 ? snapshotPath(snapshotId)
-                : snapshotPath(branchName, snapshotId);
+                : new Path(
+                        getBranchPath(tablePath, branchName)
+                                + "/snapshot/"
+                                + SNAPSHOT_PREFIX
+                                + snapshotId);
     }
 
     public Snapshot snapshot(long snapshotId) {
@@ -108,7 +103,7 @@ public Snapshot snapshot(long snapshotId) {
     }
 
     public Snapshot snapshot(String branchName, long snapshotId) {
-        Path snapshotPath = snapshotPathByBranch(branchName, snapshotId);
+        Path snapshotPath = snapshotPath(branchName, snapshotId);
         return Snapshot.fromPath(fileIO, snapshotPath);
     }
 
@@ -117,7 +112,7 @@ public boolean snapshotExists(long snapshotId) {
     }
 
     public boolean snapshotExists(String branchName, long snapshotId) {
-        Path path = snapshotPathByBranch(branchName, snapshotId);
+        Path path = snapshotPath(branchName, snapshotId);
         try {
             return fileIO.exists(path);
         } catch (IOException e) {
@@ -265,8 +260,7 @@ public long snapshotCount() throws IOException {
     }
 
     public long snapshotCount(String branch) throws IOException {
-        return listVersionedFiles(fileIO, snapshotDirectoryByBranch(branch), SNAPSHOT_PREFIX)
-                .count();
+        return listVersionedFiles(fileIO, snapshotDirectory(branch), SNAPSHOT_PREFIX).count();
     }
 
     public Iterator<Snapshot> snapshots() throws IOException {
@@ -429,7 +423,7 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter<Snapshot> checker) {
     }
 
     private @Nullable Long findLatest(String branchName) throws IOException {
-        Path snapshotDir = snapshotDirectoryByBranch(branchName);
+        Path snapshotDir = snapshotDirectory(branchName);
         if (!fileIO.exists(snapshotDir)) {
             return null;
         }
@@ -447,7 +441,7 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter<Snapshot> checker) {
     }
 
     private @Nullable Long findEarliest(String branchName) throws IOException {
-        Path snapshotDir = snapshotDirectoryByBranch(branchName);
+        Path snapshotDir = snapshotDirectory(branchName);
         if (!fileIO.exists(snapshotDir)) {
             return null;
         }
@@ -466,7 +460,7 @@ public Long readHint(String fileName) {
     }
 
     public Long readHint(String fileName, String branchName) {
-        Path snapshotDir = snapshotDirectoryByBranch(branchName);
+        Path snapshotDir = snapshotDirectory(branchName);
         Path path = new Path(snapshotDir, fileName);
         int retryNumber = 0;
         while (retryNumber++ < READ_HINT_RETRY_NUM) {
@@ -486,7 +480,7 @@ public Long readHint(String fileName, String branchName) {
 
     private Long findByListFiles(BinaryOperator<Long> reducer, String branchName)
             throws IOException {
-        Path snapshotDir = snapshotDirectoryByBranch(branchName);
+        Path snapshotDir = snapshotDirectory(branchName);
         return listVersionedFiles(fileIO, snapshotDir, SNAPSHOT_PREFIX)
                 .reduce(reducer)
                 .orElse(null);
@@ -510,7 +504,7 @@ public void commitEarliestHint(long snapshotId, String branchName) throws IOExce
 
     private void commitHint(long snapshotId, String fileName, String branchName)
             throws IOException {
-        Path snapshotDir = snapshotDirectoryByBranch(branchName);
+        Path snapshotDir = snapshotDirectory(branchName);
         Path hintFile = new Path(snapshotDir, fileName);
         fileIO.overwriteFileUtf8(hintFile, String.valueOf(snapshotId));
     }
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
index cf3c03854516a..bcd90583bea94 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
@@ -18,9 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc;
 
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.action.ActionITCaseBase;
 import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncDatabaseActionFactory;
@@ -32,14 +30,11 @@
 import org.apache.paimon.flink.action.cdc.postgres.PostgresSyncTableActionFactory;
 import org.apache.paimon.flink.action.cdc.pulsar.PulsarSyncDatabaseActionFactory;
 import org.apache.paimon.flink.action.cdc.pulsar.PulsarSyncTableActionFactory;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.BranchManager;
 
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -61,7 +56,6 @@
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.paimon.options.CatalogOptions.BRANCH;
-import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** CDC IT case base. */
@@ -89,22 +83,6 @@ protected void waitingTables(String... tables) throws Exception {
         waitingTables(Arrays.asList(tables));
     }
 
-    protected void waitingTables(List<String> tables, String branch) throws Exception {
-        LOG.info("Waiting for tables '{}'", tables);
-        Map<String, String> options = new HashMap<>();
-        options.put(WAREHOUSE.key(), new Path(warehouse).toUri().toString());
-        options.put(BRANCH.key(), branch);
-        Catalog catalogBranch =
-                CatalogFactory.createCatalog(CatalogContext.create(Options.fromMap(options)));
-        while (true) {
-            List<String> actualTables = catalogBranch.listTables(database);
-            if (actualTables.containsAll(tables)) {
-                break;
-            }
-            Thread.sleep(100);
-        }
-    }
-
     protected void waitingTables(List<String> tables) throws Exception {
         LOG.info("Waiting for tables '{}'", tables);
 
@@ -136,16 +114,6 @@ protected void assertTableNotExists(String... tableNames) throws Exception {
     protected void waitForResult(
             List<String> expected, FileStoreTable table, RowType rowType, List<String> primaryKeys)
             throws Exception {
-        waitForResult(expected, table, rowType, primaryKeys, BranchManager.DEFAULT_MAIN_BRANCH);
-    }
-
-    protected void waitForResult(
-            List<String> expected,
-            FileStoreTable table,
-            RowType rowType,
-            List<String> primaryKeys,
-            String branch)
-            throws Exception {
         assertThat(table.schema().primaryKeys()).isEqualTo(primaryKeys);
 
         // wait for table schema to become our expected schema
@@ -188,6 +156,16 @@ protected void waitForResult(
         }
     }
 
+    protected Map<String, String> getCatalogOptions(Map<String, String> catalogOptions) {
+        catalogOptions.put(BRANCH.key(), branch);
+        return catalogOptions;
+    }
+
+    protected Map<String, String> getTableConfig(Map<String, String> tableConfig) {
+        tableConfig.put(FlinkConnectorOptions.BRANCH.key(), branch);
+        return tableConfig;
+    }
+
     protected Map<String, String> getBasicTableConfig() {
         Map<String, String> config = new HashMap<>();
         ThreadLocalRandom random = ThreadLocalRandom.current();
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncDatabaseActionITCase.java
index de5226fba5b91..94efa60c71e40 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncDatabaseActionITCase.java
@@ -37,12 +37,6 @@ public void testSchemaEvolutionOneTopic() throws Exception {
         testSchemaEvolutionOneTopic(DEBEZIUM);
     }
 
-    @Test
-    @Timeout(240)
-    public void testSchemaEvolutionOneTopicWithBranch() throws Exception {
-        testSchemaEvolutionOneTopicWithBranch(DEBEZIUM, "testBranch");
-    }
-
     @Test
     public void testTopicIsEmpty() {
         testTopicIsEmpty(DEBEZIUM);
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncDatabaseToBranchActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncDatabaseToBranchActionITCase.java
new file mode 100644
index 0000000000000..53246624744f2
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncDatabaseToBranchActionITCase.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.IOException;
+
+/** IT cases for {@link KafkaSyncDatabaseAction}. */
+public class KafkaDebeziumSyncDatabaseToBranchActionITCase
+        extends KafkaDebeziumSyncDatabaseActionITCase {
+    @BeforeEach
+    public void before() throws IOException {
+        branch = "testKafkaDebeziumSyncDatabaseBranch";
+        super.before();
+    }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
index 83c729ffd0899..7aba174d3dfcf 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
@@ -32,12 +32,6 @@ public void testSchemaEvolution() throws Exception {
         runSingleTableSchemaEvolution("schemaevolution", DEBEZIUM);
     }
 
-    @Test
-    @Timeout(120)
-    public void testSchemaEvolutionToBranch() throws Exception {
-        runSingleTableSchemaEvolutionToBranch("schemaevolution", DEBEZIUM, "testBranch");
-    }
-
     @Test
     @Timeout(60)
     public void testNotSupportFormat() throws Exception {
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableToBranchActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableToBranchActionITCase.java
new file mode 100644
index 0000000000000..77860bbb37e8b
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableToBranchActionITCase.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.IOException;
+
+/** IT cases for {@link KafkaSyncTableAction}. */
+public class KafkaDebeziumSyncTableToBranchActionITCase extends KafkaDebeziumSyncTableActionITCase {
+
+    @BeforeEach
+    public void before() throws IOException {
+        branch = "testKafkaDebeziumSyncTableBranch";
+        super.before();
+    }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
index a7bee685c7b12..cf474ea60f318 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
@@ -19,14 +19,10 @@
 package org.apache.paimon.flink.action.cdc.kafka;
 
 import org.apache.paimon.catalog.FileSystemCatalogOptions;
-import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.BranchManager;
-
-import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
 
 import javax.annotation.Nullable;
 
@@ -74,7 +70,8 @@ protected void testSchemaEvolutionMultiTopic(String format) throws Exception {
         kafkaConfig.put(TOPIC.key(), String.join(";", topics));
         KafkaSyncDatabaseAction action =
                 syncDatabaseActionBuilder(kafkaConfig)
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .build();
         runActionWithDefaultEnv(action);
 
@@ -108,69 +105,21 @@ protected void testSchemaEvolutionOneTopic(String format) throws Exception {
         kafkaConfig.put(TOPIC.key(), String.join(";", topics));
         KafkaSyncDatabaseAction action =
                 syncDatabaseActionBuilder(kafkaConfig)
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .build();
         runActionWithDefaultEnv(action);
 
         testSchemaEvolutionImpl(topics, writeOne, fileCount, format);
     }
 
-    protected void testSchemaEvolutionOneTopicWithBranch(String format, String branch)
-            throws Exception {
-        final String topic = "schema_evolution";
-        boolean writeOne = true;
-        int fileCount = 2;
-        List<String> topics = Collections.singletonList(topic);
-        topics.forEach(t -> createTestTopic(t, 1, 1));
-
-        // ---------- Write the maxwell json into Kafka -------------------
-
-        for (int i = 0; i < fileCount; i++) {
-            try {
-                writeRecordsToKafka(
-                        topics.get(0),
-                        readLines(
-                                String.format(
-                                        "kafka/%s/database/schemaevolution/topic%s/%s-data-1.txt",
-                                        format, i, format)));
-            } catch (Exception e) {
-                throw new Exception(String.format("Failed to write %s data to Kafka.", format), e);
-            }
-        }
-
-        Map<String, String> kafkaConfig = getBasicKafkaConfig();
-        kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
-        kafkaConfig.put(TOPIC.key(), String.join(";", topics));
-
-        Map<String, String> tableConfig = getBasicTableConfig();
-        tableConfig.put(FlinkConnectorOptions.BRANCH.key(), branch);
-        Map<String, String> catalogConfig = new HashMap<>();
-        catalogConfig.put(FlinkConnectorOptions.BRANCH.key(), branch);
-
-        KafkaSyncDatabaseAction action =
-                syncDatabaseActionBuilder(kafkaConfig)
-                        .withTableConfig(tableConfig)
-                        .withCatalogConfig(catalogConfig)
-                        .build();
-        runActionWithDefaultEnv(action);
-
-        testSchemaEvolutionImpl(topics, writeOne, fileCount, format, branch);
-    }
-
     private void testSchemaEvolutionImpl(
             List<String> topics, boolean writeOne, int fileCount, String format) throws Exception {
-        testSchemaEvolutionImpl(
-                topics, writeOne, fileCount, format, BranchManager.DEFAULT_MAIN_BRANCH);
-    }
-
-    private void testSchemaEvolutionImpl(
-            List<String> topics, boolean writeOne, int fileCount, String format, String branch)
-            throws Exception {
 
-        waitingTables(Lists.newArrayList("t1", "t2"), branch);
+        waitingTables("t1", "t2");
 
-        FileStoreTable table1 = getFileStoreTable("t1", branch);
-        FileStoreTable table2 = getFileStoreTable("t2", branch);
+        FileStoreTable table1 = getFileStoreTable("t1");
+        FileStoreTable table2 = getFileStoreTable("t2");
 
         RowType rowType1 =
                 RowType.of(
@@ -186,7 +135,7 @@ private void testSchemaEvolutionImpl(
                 Arrays.asList(
                         "+I[101, scooter, Small 2-wheel scooter, 3.14]",
                         "+I[102, car battery, 12V car battery, 8.1]");
-        waitForResult(expected, table1, rowType1, getPrimaryKey(format), branch);
+        waitForResult(expected, table1, rowType1, getPrimaryKey(format));
 
         RowType rowType2 =
                 RowType.of(
@@ -202,7 +151,7 @@ private void testSchemaEvolutionImpl(
                 Arrays.asList(
                         "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]",
                         "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
-        waitForResult(expected2, table2, rowType2, getPrimaryKey(format), branch);
+        waitForResult(expected2, table2, rowType2, getPrimaryKey(format));
 
         for (int i = 0; i < fileCount; i++) {
             try {
@@ -233,7 +182,7 @@ private void testSchemaEvolutionImpl(
                         "+I[102, car battery, 12V car battery, 8.1, NULL]",
                         "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 19]",
                         "+I[104, hammer, 12oz carpenter's hammer, 0.75, 25]");
-        waitForResult(expected, table1, rowType1, getPrimaryKey(format), branch);
+        waitForResult(expected, table1, rowType1, getPrimaryKey(format));
 
         rowType2 =
                 RowType.of(
@@ -260,14 +209,18 @@ private void testSchemaEvolutionImpl(
                             "+I[104, hammer, 12oz carpenter's hammer, 0.75, Shanghai]");
         }
 
-        waitForResult(expected, table2, rowType2, getPrimaryKey(format), branch);
+        waitForResult(expected, table2, rowType2, getPrimaryKey(format));
     }
 
     protected void testTopicIsEmpty(String format) {
         Map<String, String> kafkaConfig = getBasicKafkaConfig();
         kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
 
-        KafkaSyncDatabaseAction action = syncDatabaseActionBuilder(kafkaConfig).build();
+        KafkaSyncDatabaseAction action =
+                syncDatabaseActionBuilder(kafkaConfig)
+                        .withTableConfig(getTableConfig(new HashMap<>()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
+                        .build();
 
         assertThatThrownBy(action::run)
                 .satisfies(
@@ -322,7 +275,8 @@ protected void testTableAffixMultiTopic(String format) throws Exception {
                 syncDatabaseActionBuilder(kafkaConfig)
                         .withTablePrefix("test_prefix_")
                         .withTableSuffix("_test_suffix")
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         // test including check with affix
                         .includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*")
                         .build();
@@ -376,7 +330,8 @@ protected void testTableAffixOneTopic(String format) throws Exception {
                 syncDatabaseActionBuilder(kafkaConfig)
                         .withTablePrefix("test_prefix_")
                         .withTableSuffix("_test_suffix")
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         // test including check with affix
                         .includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*")
                         .build();
@@ -545,7 +500,8 @@ private void includingAndExcludingTablesImpl(
                 syncDatabaseActionBuilder(kafkaConfig)
                         .includingTables(includingTables)
                         .excludingTables(excludingTables)
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .build();
         runActionWithDefaultEnv(action);
 
@@ -571,12 +527,13 @@ protected void testCaseInsensitive(String format) throws Exception {
         kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
         kafkaConfig.put(TOPIC.key(), topic);
 
+        Map<String, String> catalogConfig = new HashMap<>();
+        catalogConfig.put(FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false");
+
         KafkaSyncDatabaseAction action =
                 syncDatabaseActionBuilder(kafkaConfig)
-                        .withTableConfig(getBasicTableConfig())
-                        .withCatalogConfig(
-                                Collections.singletonMap(
-                                        FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(catalogConfig))
                         .build();
         runActionWithDefaultEnv(action);
 
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
index 398d68998d797..e6e6310b5130f 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action.cdc.kafka;
 
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.schema.Schema;
@@ -76,122 +75,14 @@ protected void runSingleTableSchemaEvolution(String sourceDir, String format) th
         KafkaSyncTableAction action =
                 syncTableActionBuilder(kafkaConfig)
                         .withPrimaryKeys("id")
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .build();
         runActionWithDefaultEnv(action);
 
         testSchemaEvolutionImpl(topic, sourceDir, format);
     }
 
-    protected void runSingleTableSchemaEvolutionToBranch(
-            String sourceDir, String format, String branch) throws Exception {
-        final String topic = "schema_evolution-branch";
-        createTestTopic(topic, 1, 1);
-        // ---------- Write the data into Kafka -------------------
-        List<String> lines =
-                readLines(
-                        String.format(
-                                "kafka/%s/table/%s/%s-data-1.txt", format, sourceDir, format));
-        try {
-            writeRecordsToKafka(topic, lines);
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to Kafka.", format), e);
-        }
-        Map<String, String> kafkaConfig = getBasicKafkaConfig();
-        kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
-        kafkaConfig.put(TOPIC.key(), topic);
-        Map<String, String> tableConfig = getBasicTableConfig();
-        tableConfig.put(FlinkConnectorOptions.BRANCH.key(), branch);
-        Map<String, String> catalogConfig = new HashMap<>();
-        catalogConfig.put(FlinkConnectorOptions.BRANCH.key(), branch);
-        KafkaSyncTableAction action =
-                syncTableActionBuilder(kafkaConfig)
-                        .withPrimaryKeys("id")
-                        .withTableConfig(tableConfig)
-                        .withCatalogConfig(catalogConfig)
-                        .build();
-        runActionWithDefaultEnv(action);
-
-        testSchemaEvolutionImplWithBranch(topic, sourceDir, format, branch);
-    }
-
-    private void testSchemaEvolutionImplWithBranch(
-            String topic, String sourceDir, String format, String branch) throws Exception {
-        FileStoreTable table = getFileStoreTable(tableName, branch);
-
-        RowType rowType =
-                RowType.of(
-                        new DataType[] {
-                            DataTypes.STRING().notNull(),
-                            DataTypes.STRING(),
-                            DataTypes.STRING(),
-                            DataTypes.STRING()
-                        },
-                        new String[] {"id", "name", "description", "weight"});
-        List<String> primaryKeys = Collections.singletonList("id");
-        List<String> expected =
-                Arrays.asList(
-                        "+I[101, scooter, Small 2-wheel scooter, 3.14]",
-                        "+I[102, car battery, 12V car battery, 8.1]");
-        waitForResult(expected, table, rowType, primaryKeys, branch);
-
-        try {
-            writeRecordsToKafka(
-                    topic,
-                    readLines(
-                            String.format(
-                                    "kafka/%s/table/%s/%s-data-2.txt", format, sourceDir, format)));
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to Kafka.", format), e);
-        }
-        rowType =
-                RowType.of(
-                        new DataType[] {
-                            DataTypes.STRING().notNull(),
-                            DataTypes.STRING(),
-                            DataTypes.STRING(),
-                            DataTypes.STRING(),
-                            DataTypes.STRING()
-                        },
-                        new String[] {"id", "name", "description", "weight", "age"});
-        expected =
-                Arrays.asList(
-                        "+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]",
-                        "+I[102, car battery, 12V car battery, 8.1, NULL]",
-                        "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18]",
-                        "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]");
-        waitForResult(expected, table, rowType, primaryKeys, branch);
-
-        try {
-            writeRecordsToKafka(
-                    topic,
-                    readLines(
-                            String.format(
-                                    "kafka/%s/table/%s/%s-data-3.txt", format, sourceDir, format)));
-        } catch (Exception e) {
-            throw new Exception(String.format("Failed to write %s data to Kafka.", format), e);
-        }
-        rowType =
-                RowType.of(
-                        new DataType[] {
-                            DataTypes.STRING().notNull(),
-                            DataTypes.STRING(),
-                            DataTypes.STRING(),
-                            DataTypes.STRING(),
-                            DataTypes.STRING(),
-                            DataTypes.STRING()
-                        },
-                        new String[] {"id", "name", "description", "weight", "age", "address"});
-        expected =
-                Arrays.asList(
-                        "+I[102, car battery, 12V car battery, 8.1, NULL, NULL]",
-                        "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18, NULL]",
-                        "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24, NULL]",
-                        "+I[105, hammer, 14oz carpenter's hammer, 0.875, NULL, Beijing]",
-                        "+I[107, rocks, box of assorted rocks, 5.3, NULL, NULL]");
-        waitForResult(expected, table, rowType, primaryKeys, branch);
-    }
-
     private void testSchemaEvolutionImpl(String topic, String sourceDir, String format)
             throws Exception {
         FileStoreTable table = getFileStoreTable(tableName);
@@ -288,7 +179,8 @@ public void testNotSupportFormat(String format) throws Exception {
         KafkaSyncTableAction action =
                 syncTableActionBuilder(kafkaConfig)
                         .withPrimaryKeys("id")
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .build();
 
         assertThatThrownBy(action::run)
@@ -327,7 +219,8 @@ protected void testAssertSchemaCompatible(String format) throws Exception {
         KafkaSyncTableAction action =
                 syncTableActionBuilder(kafkaConfig)
                         .withPrimaryKeys("id")
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .build();
 
         assertThatThrownBy(action::run)
@@ -359,7 +252,8 @@ protected void testStarUpOptionSpecific(String format) throws Exception {
         KafkaSyncTableAction action =
                 syncTableActionBuilder(kafkaConfig)
                         .withPrimaryKeys("id")
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .build();
         runActionWithDefaultEnv(action);
 
@@ -400,7 +294,8 @@ protected void testStarUpOptionLatest(String format) throws Exception {
         KafkaSyncTableAction action =
                 syncTableActionBuilder(kafkaConfig)
                         .withPrimaryKeys("id")
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .build();
         runActionWithDefaultEnv(action);
 
@@ -455,7 +350,8 @@ public void testStarUpOptionTimestamp(String format) throws Exception {
         KafkaSyncTableAction action =
                 syncTableActionBuilder(kafkaConfig)
                         .withPrimaryKeys("id")
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .build();
         runActionWithDefaultEnv(action);
 
@@ -507,7 +403,8 @@ public void testStarUpOptionEarliest(String format) throws Exception {
         KafkaSyncTableAction action =
                 syncTableActionBuilder(kafkaConfig)
                         .withPrimaryKeys("id")
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .build();
         runActionWithDefaultEnv(action);
 
@@ -561,7 +458,8 @@ public void testStarUpOptionGroup(String format) throws Exception {
         KafkaSyncTableAction action =
                 syncTableActionBuilder(kafkaConfig)
                         .withPrimaryKeys("id")
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .build();
         runActionWithDefaultEnv(action);
 
@@ -617,7 +515,8 @@ public void testComputedColumn(String format) throws Exception {
                         .withPartitionKeys("_year")
                         .withPrimaryKeys("_id", "_year")
                         .withComputedColumnArgs("_year=year(_date)")
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .build();
         runActionWithDefaultEnv(action);
 
@@ -653,7 +552,8 @@ protected void testCDCOperations(String format) throws Exception {
         KafkaSyncTableAction action =
                 syncTableActionBuilder(kafkaConfig)
                         .withPrimaryKeys("id")
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .build();
         runActionWithDefaultEnv(action);
 
@@ -759,14 +659,17 @@ public void testWaterMarkSyncTable(String format) throws Exception {
         config.put("scan.watermark.alignment.update-interval", "1 s");
 
         KafkaSyncTableAction action =
-                syncTableActionBuilder(kafkaConfig).withTableConfig(config).build();
+                syncTableActionBuilder(kafkaConfig)
+                        .withTableConfig(getTableConfig(config))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
+                        .build();
         runActionWithDefaultEnv(action);
 
         FileStoreTable table =
                 (FileStoreTable) catalog.getTable(new Identifier(database, tableName));
         while (true) {
-            if (table.snapshotManager().snapshotCount() > 0
-                    && table.snapshotManager().latestSnapshot().watermark()
+            if (table.snapshotManager().snapshotCount(branch) > 0
+                    && table.snapshotManager().latestSnapshot(branch).watermark()
                             != -9223372036854775808L) {
                 return;
             }
@@ -791,7 +694,8 @@ public void testSchemaIncludeRecord(String format) throws Exception {
         KafkaSyncTableAction action =
                 syncTableActionBuilder(kafkaConfig)
                         .withPrimaryKeys("id")
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .build();
         runActionWithDefaultEnv(action);
 
@@ -828,7 +732,8 @@ public void testAllTypesWithSchemaImpl(String format) throws Exception {
                 syncTableActionBuilder(kafkaConfig)
                         .withPartitionKeys("pt")
                         .withPrimaryKeys("pt", "_id")
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .build();
         runActionWithDefaultEnv(action);
 
@@ -1080,7 +985,8 @@ protected void testTableFiledValNull(String format) throws Exception {
         KafkaSyncTableAction action =
                 syncTableActionBuilder(kafkaConfig)
                         .withPrimaryKeys("id")
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .build();
         runActionWithDefaultEnv(action);
 
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
index 010041268c8a5..38bcec75e4b93 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
@@ -68,6 +68,9 @@ protected static void start() {
     }
 
     protected Statement getStatement() throws SQLException {
+        System.out.println(MYSQL_CONTAINER.getJdbcUrl());
+        System.out.println(MYSQL_CONTAINER.getUsername());
+        System.out.println(MYSQL_CONTAINER.getPassword());
         Connection conn =
                 DriverManager.getConnection(
                         MYSQL_CONTAINER.getJdbcUrl(),
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 790dd969dd9fc..a00abd48369f1 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -21,7 +21,6 @@
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.FileSystemCatalogOptions;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.MultiTablesSinkMode;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.table.FileStoreTable;
@@ -30,8 +29,6 @@
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.JsonSerdeUtil;
 
-import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
-
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.SavepointFormatType;
@@ -82,7 +79,8 @@ public void testSchemaEvolution() throws Exception {
 
         MySqlSyncDatabaseAction action =
                 syncDatabaseActionBuilder(mySqlConfig)
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .build();
         runActionWithDefaultEnv(action);
 
@@ -210,7 +208,11 @@ public void testSpecifiedMySqlTable() {
         mySqlConfig.put("database-name", "paimon_sync_database");
         mySqlConfig.put("table-name", "my_table");
 
-        MySqlSyncDatabaseAction action = syncDatabaseActionBuilder(mySqlConfig).build();
+        MySqlSyncDatabaseAction action =
+                syncDatabaseActionBuilder(mySqlConfig)
+                        .withTableConfig(getTableConfig(new HashMap<>()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
+                        .build();
 
         assertThatThrownBy(action::run)
                 .isInstanceOf(IllegalArgumentException.class)
@@ -225,7 +227,11 @@ public void testInvalidDatabase() {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
         mySqlConfig.put("database-name", "invalid");
 
-        MySqlSyncDatabaseAction action = syncDatabaseActionBuilder(mySqlConfig).build();
+        MySqlSyncDatabaseAction action =
+                syncDatabaseActionBuilder(mySqlConfig)
+                        .withTableConfig(getTableConfig(new HashMap<>()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
+                        .build();
 
         assertThatThrownBy(action::run)
                 .isInstanceOf(IllegalArgumentException.class)
@@ -252,7 +258,8 @@ public void testIgnoreIncompatibleTables() throws Exception {
 
         MySqlSyncDatabaseAction action =
                 syncDatabaseActionBuilder(mySqlConfig)
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .ignoreIncompatible(true)
                         .build();
         runActionWithDefaultEnv(action);
@@ -299,7 +306,8 @@ public void testTableAffix() throws Exception {
 
         MySqlSyncDatabaseAction action =
                 syncDatabaseActionBuilder(mySqlConfig)
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .withTablePrefix("test_prefix_")
                         .withTableSuffix("_test_suffix")
                         // test including check with affix
@@ -456,7 +464,8 @@ private void includingAndExcludingTablesImpl(
 
         MySqlSyncDatabaseAction action =
                 syncDatabaseActionBuilder(mySqlConfig)
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .includingTables(includingTables)
                         .excludingTables(excludingTables)
                         .build();
@@ -473,12 +482,13 @@ public void testIgnoreCase() throws Exception {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
         mySqlConfig.put("database-name", "paimon_ignore_CASE");
 
+        Map<String, String> catalogConfig = new HashMap<>();
+        catalogConfig.put(FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false");
+
         MySqlSyncDatabaseAction action =
                 syncDatabaseActionBuilder(mySqlConfig)
-                        .withCatalogConfig(
-                                Collections.singletonMap(
-                                        FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(catalogConfig))
                         .build();
         runActionWithDefaultEnv(action);
 
@@ -572,6 +582,8 @@ public void testAddIgnoredTable() throws Exception {
         MySqlSyncDatabaseAction action =
                 syncDatabaseActionBuilder(mySqlConfig)
                         .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .includingTables("t.+")
                         .excludingTables(".*a$")
                         .withMode(COMBINED.configString())
@@ -849,16 +861,15 @@ private JobClient buildSyncDatabaseActionWithNewlyAddedTables(
         mySqlConfig.put("database-name", databaseName);
         mySqlConfig.put("scan.incremental.snapshot.chunk.size", "1");
 
-        Map<String, String> catalogConfig =
-                testSchemaChange
-                        ? Collections.singletonMap(
-                                CatalogOptions.METASTORE.key(), "test-alter-table")
-                        : Collections.emptyMap();
+        Map<String, String> catalogConfig = new HashMap<>();
+        if (testSchemaChange) {
+            catalogConfig.put(CatalogOptions.METASTORE.key(), "test-alter-table");
+        }
 
         MySqlSyncDatabaseAction action =
                 syncDatabaseActionBuilder(mySqlConfig)
-                        .withCatalogConfig(catalogConfig)
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(catalogConfig))
                         .includingTables("t.+")
                         .withMode(COMBINED.configString())
                         .build();
@@ -898,7 +909,8 @@ public void testSyncManyTableWithLimitedMemory() throws Exception {
 
         MySqlSyncDatabaseAction action =
                 syncDatabaseActionBuilder(mySqlConfig)
-                        .withTableConfig(tableConfig)
+                        .withTableConfig(getTableConfig(tableConfig))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .withMode(COMBINED.configString())
                         .build();
         runActionWithDefaultEnv(action);
@@ -945,7 +957,8 @@ public void testSyncMultipleShards() throws Exception {
         MultiTablesSinkMode mode = ThreadLocalRandom.current().nextBoolean() ? DIVIDED : COMBINED;
         MySqlSyncDatabaseAction action =
                 syncDatabaseActionBuilder(mySqlConfig)
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .withMode(mode.configString())
                         .build();
         runActionWithDefaultEnv(action);
@@ -1044,139 +1057,6 @@ public void testSyncMultipleShards() throws Exception {
         }
     }
 
-    @Test
-    @Timeout(120)
-    public void testSyncMultipleShardsWithBranch() throws Exception {
-
-        String branch = "testBranch";
-        Map<String, String> mySqlConfig = getBasicMySqlConfig();
-
-        // test table list
-        mySqlConfig.put(
-                "database-name",
-                ThreadLocalRandom.current().nextBoolean()
-                        ? "database_branch_shard_.*"
-                        : "database_branch_shard_1|database_branch_shard_2");
-
-        Map<String, String> tableConfig = getBasicTableConfig();
-        tableConfig.put(FlinkConnectorOptions.BRANCH.key(), branch);
-        Map<String, String> catalogConfig = new HashMap<>();
-        catalogConfig.put(FlinkConnectorOptions.BRANCH.key(), branch);
-
-        MultiTablesSinkMode mode = ThreadLocalRandom.current().nextBoolean() ? DIVIDED : COMBINED;
-        MySqlSyncDatabaseAction action =
-                syncDatabaseActionBuilder(mySqlConfig)
-                        .withTableConfig(tableConfig)
-                        .withCatalogConfig(catalogConfig)
-                        .withMode(mode.configString())
-                        .build();
-        runActionWithDefaultEnv(action);
-
-        try (Statement statement = getStatement()) {
-            // test insert into t1
-            statement.executeUpdate("INSERT INTO database_branch_shard_1.t1 VALUES (1, 'db1_1')");
-            statement.executeUpdate("INSERT INTO database_branch_shard_1.t1 VALUES (2, 'db1_2')");
-
-            statement.executeUpdate(
-                    "INSERT INTO database_branch_shard_2.t1 VALUES (3, 'db2_3', 300)");
-            statement.executeUpdate(
-                    "INSERT INTO database_branch_shard_2.t1 VALUES (4, 'db2_4', 400)");
-
-            FileStoreTable table = getFileStoreTable("t1", branch);
-            RowType rowType =
-                    RowType.of(
-                            new DataType[] {
-                                DataTypes.INT().notNull(), DataTypes.VARCHAR(20), DataTypes.BIGINT()
-                            },
-                            new String[] {"k", "v1", "v2"});
-            waitForResult(
-                    Arrays.asList(
-                            "+I[1, db1_1, NULL]",
-                            "+I[2, db1_2, NULL]",
-                            "+I[3, db2_3, 300]",
-                            "+I[4, db2_4, 400]"),
-                    table,
-                    rowType,
-                    Collections.singletonList("k"),
-                    branch);
-
-            // test schema evolution of t2
-            statement.executeUpdate("ALTER TABLE database_branch_shard_1.t2 ADD COLUMN v2 INT");
-            statement.executeUpdate(
-                    "ALTER TABLE database_branch_shard_2.t2 ADD COLUMN v3 VARCHAR(10)");
-            statement.executeUpdate("INSERT INTO database_branch_shard_1.t2 VALUES (1, 1.1, 1)");
-            statement.executeUpdate("INSERT INTO database_branch_shard_1.t2 VALUES (2, 2.2, 2)");
-            statement.executeUpdate(
-                    "INSERT INTO database_branch_shard_2.t2 VALUES (3, 3.3, 'db2_3')");
-            statement.executeUpdate(
-                    "INSERT INTO database_branch_shard_2.t2 VALUES (4, 4.4, 'db2_4')");
-            table = getFileStoreTable("t2", branch);
-            rowType =
-                    RowType.of(
-                            new DataType[] {
-                                DataTypes.BIGINT().notNull(),
-                                DataTypes.DOUBLE(),
-                                DataTypes.INT(),
-                                DataTypes.VARCHAR(10)
-                            },
-                            new String[] {"k", "v1", "v2", "v3"});
-            waitForResult(
-                    Arrays.asList(
-                            "+I[1, 1.1, 1, NULL]",
-                            "+I[2, 2.2, 2, NULL]",
-                            "+I[3, 3.3, NULL, db2_3]",
-                            "+I[4, 4.4, NULL, db2_4]"),
-                    table,
-                    rowType,
-                    Collections.singletonList("k"),
-                    branch);
-
-            // test that database_shard_2.t3 won't be synchronized
-            statement.executeUpdate(
-                    "INSERT INTO database_branch_shard_2.t3 VALUES (1, 'db2_1'), (2, 'db2_2')");
-            statement.executeUpdate(
-                    "INSERT INTO database_branch_shard_1.t3 VALUES (3, 'db1_3'), (4, 'db1_4')");
-            table = getFileStoreTable("t3", branch);
-            rowType =
-                    RowType.of(
-                            new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)},
-                            new String[] {"k", "v1"});
-            waitForResult(
-                    Arrays.asList("+I[3, db1_3]", "+I[4, db1_4]"),
-                    table,
-                    rowType,
-                    Collections.singletonList("k"),
-                    branch);
-
-            // test newly created table
-            if (mode == COMBINED) {
-                statement.executeUpdate(
-                        "CREATE TABLE database_branch_shard_1.t4 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
-                statement.executeUpdate(
-                        "INSERT INTO database_branch_shard_1.t4 VALUES (1, 'db1_1')");
-
-                statement.executeUpdate(
-                        "CREATE TABLE database_branch_shard_2.t4 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
-                statement.executeUpdate(
-                        "INSERT INTO database_branch_shard_2.t4 VALUES (2, 'db2_2')");
-
-                waitingTables(Lists.newArrayList("t4"), branch);
-
-                table = getFileStoreTable("t4", branch);
-                rowType =
-                        RowType.of(
-                                new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)},
-                                new String[] {"k", "v1"});
-                waitForResult(
-                        Arrays.asList("+I[1, db1_1]", "+I[2, db2_2]"),
-                        table,
-                        rowType,
-                        Collections.singletonList("k"),
-                        branch);
-            }
-        }
-    }
-
     @Test
     @Timeout(60)
     public void testSyncMultipleShardsWithoutMerging() throws Exception {
@@ -1186,7 +1066,8 @@ public void testSyncMultipleShardsWithoutMerging() throws Exception {
         MultiTablesSinkMode mode = ThreadLocalRandom.current().nextBoolean() ? DIVIDED : COMBINED;
         MySqlSyncDatabaseAction action =
                 syncDatabaseActionBuilder(mySqlConfig)
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .mergeShards(false)
                         .withMode(mode.configString())
                         .build();
@@ -1301,6 +1182,8 @@ public void testMonitoredAndExcludedTablesWithMering() throws Exception {
 
         MySqlSyncDatabaseAction action =
                 syncDatabaseActionBuilder(mySqlConfig)
+                        .withTableConfig(getTableConfig(new HashMap<>()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .ignoreIncompatible(true)
                         .withMode(COMBINED.configString())
                         .build();
@@ -1338,7 +1221,8 @@ public void testNewlyAddedTablesOptionsChange() throws Exception {
 
         MySqlSyncDatabaseAction action1 =
                 syncDatabaseActionBuilder(mySqlConfig)
-                        .withTableConfig(tableConfig)
+                        .withTableConfig(getTableConfig(tableConfig))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .withMode(COMBINED.configString())
                         .build();
 
@@ -1364,7 +1248,10 @@ public void testNewlyAddedTablesOptionsChange() throws Exception {
         }
 
         MySqlSyncDatabaseAction action2 =
-                syncDatabaseActionBuilder(mySqlConfig).withTableConfig(tableConfig).build();
+                syncDatabaseActionBuilder(mySqlConfig)
+                        .withTableConfig(getTableConfig(tableConfig))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
+                        .build();
         runActionWithDefaultEnv(action2);
         waitingTables("t2");
 
@@ -1374,15 +1261,20 @@ public void testNewlyAddedTablesOptionsChange() throws Exception {
 
     @Test
     public void testCatalogAndTableConfig() {
+        Map<String, String> catalogConfig = new HashMap<>();
+        catalogConfig.put("catalog-key", "catalog-value");
+
+        Map<String, String> tableConfig = new HashMap<>();
+        tableConfig.put("table-key", "table-value");
+
         MySqlSyncDatabaseAction action =
                 syncDatabaseActionBuilder(getBasicMySqlConfig())
-                        .withCatalogConfig(Collections.singletonMap("catalog-key", "catalog-value"))
-                        .withTableConfig(Collections.singletonMap("table-key", "table-value"))
+                        .withTableConfig(getTableConfig(tableConfig))
+                        .withCatalogConfig(getCatalogOptions(catalogConfig))
                         .build();
 
         assertThat(action.catalogConfig()).containsEntry("catalog-key", "catalog-value");
-        assertThat(action.tableConfig())
-                .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
+        assertThat(action.tableConfig()).containsExactlyEntriesOf(getTableConfig(tableConfig));
     }
 
     @Test
@@ -1394,7 +1286,8 @@ public void testMetadataColumns() throws Exception {
         MultiTablesSinkMode mode = ThreadLocalRandom.current().nextBoolean() ? DIVIDED : COMBINED;
         MySqlSyncDatabaseAction action =
                 syncDatabaseActionBuilder(mySqlConfig)
-                        .withTableConfig(getBasicTableConfig())
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .withMode(mode.configString())
                         .withMetadataColumn(Arrays.asList("table_name", "database_name"))
                         .build();
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionToBranchITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionToBranchITCase.java
new file mode 100644
index 0000000000000..e6cf68455bb10
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionToBranchITCase.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.IOException;
+
+/** IT cases for {@link MySqlSyncDatabaseAction}. */
+public class MySqlSyncDatabaseActionToBranchITCase extends MySqlSyncDatabaseActionITCase {
+
+    @BeforeEach
+    @Override
+    public void before() throws IOException {
+        this.branch = "testMySqlSyncDatabaseActionBranch";
+        super.before();
+    }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 0dbeb0aeff8e2..1f09f51f8f4c9 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.catalog.FileSystemCatalogOptions;
-import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
@@ -73,12 +72,13 @@ public void testSchemaEvolution() throws Exception {
         mySqlConfig.put("database-name", DATABASE_NAME);
         mySqlConfig.put("table-name", "schema_evolution_\\d+");
 
+        Map<String, String> catalogConfig = getBasicMySqlConfig();
+        catalogConfig.put(CatalogOptions.METASTORE.key(), "test-alter-table");
+
         MySqlSyncTableAction action =
                 syncTableActionBuilder(mySqlConfig)
-                        .withCatalogConfig(
-                                Collections.singletonMap(
-                                        CatalogOptions.METASTORE.key(), "test-alter-table"))
-                        .withTableConfig(getBasicTableConfig())
+                        .withCatalogConfig(getCatalogOptions(catalogConfig))
+                        .withTableConfig(getTableConfig(getBasicTableConfig()))
                         .withPartitionKeys("pt")
                         .withPrimaryKeys("pt", "_id")
                         .build();
@@ -94,50 +94,6 @@ public void testSchemaEvolution() throws Exception {
         }
     }
 
-    @Test
-    @Timeout(120)
-    public void testSchemaEvolutionToBranch() throws Exception {
-        String branch = "testBranch";
-
-        Map<String, String> mySqlConfig = getBasicMySqlConfig();
-        mySqlConfig.put("database-name", DATABASE_NAME);
-        mySqlConfig.put("table-name", "schema_evolution_\\d+");
-
-        Map<String, String> tableConfig = getBasicTableConfig();
-        tableConfig.put(FlinkConnectorOptions.BRANCH.key(), branch);
-        Map<String, String> catalogConfig = new HashMap<>();
-        catalogConfig.put(FlinkConnectorOptions.BRANCH.key(), branch);
-
-        MySqlSyncTableAction action =
-                syncTableActionBuilder(mySqlConfig)
-                        .withCatalogConfig(
-                                Collections.singletonMap(
-                                        CatalogOptions.METASTORE.key(), "test-alter-table"))
-                        .withTableConfig(tableConfig)
-                        .withCatalogConfig(catalogConfig)
-                        .withPartitionKeys("pt")
-                        .withPrimaryKeys("pt", "_id")
-                        .build();
-        runActionWithDefaultEnv(action);
-
-        checkTableSchemaWithBranch(
-                "[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT NULL\",\"description\":\"primary\"},"
-                        + "{\"id\":1,\"name\":\"_id\",\"type\":\"INT NOT NULL\",\"description\":\"_id\"},"
-                        + "{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"}]",
-                branch);
-
-        try (Statement statement = getStatement()) {
-            testSchemaEvolutionImplToBranch(statement, branch);
-        }
-    }
-
-    private void checkTableSchemaWithBranch(String excepted, String branch) throws Exception {
-
-        FileStoreTable table = getFileStoreTable(tableName, branch);
-
-        assertThat(JsonSerdeUtil.toFlatJson(table.schema().fields())).isEqualTo(excepted);
-    }
-
     private void checkTableSchema(String excepted) throws Exception {
 
         FileStoreTable table = getFileStoreTable();
@@ -289,148 +245,6 @@ private void testSchemaEvolutionImpl(Statement statement) throws Exception {
         assertThat(getFileStoreTable().options()).containsEntry("alter-table-test", "true");
     }
 
-    private void testSchemaEvolutionImplToBranch(Statement statement, String branch)
-            throws Exception {
-        FileStoreTable table = getFileStoreTable(tableName, branch);
-        statement.executeUpdate("USE " + DATABASE_NAME);
-
-        statement.executeUpdate("INSERT INTO schema_evolution_1 VALUES (1, 1, 'one')");
-        statement.executeUpdate(
-                "INSERT INTO schema_evolution_2 VALUES (1, 2, 'two'), (2, 4, 'four')");
-        RowType rowType =
-                RowType.of(
-                        new DataType[] {
-                            DataTypes.INT().notNull(),
-                            DataTypes.INT().notNull(),
-                            DataTypes.VARCHAR(10)
-                        },
-                        new String[] {"pt", "_id", "v1"});
-        List<String> primaryKeys = Arrays.asList("pt", "_id");
-        List<String> expected = Arrays.asList("+I[1, 1, one]", "+I[1, 2, two]", "+I[2, 4, four]");
-        waitForResult(expected, table, rowType, primaryKeys, branch);
-
-        statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v2 INT");
-        statement.executeUpdate(
-                "INSERT INTO schema_evolution_1 VALUES (2, 3, 'three', 30), (1, 5, 'five', 50)");
-        statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v2 INT");
-        statement.executeUpdate("INSERT INTO schema_evolution_2 VALUES (1, 6, 'six', 60)");
-        statement.executeUpdate("UPDATE schema_evolution_2 SET v1 = 'second' WHERE _id = 2");
-        rowType =
-                RowType.of(
-                        new DataType[] {
-                            DataTypes.INT().notNull(),
-                            DataTypes.INT().notNull(),
-                            DataTypes.VARCHAR(10),
-                            DataTypes.INT()
-                        },
-                        new String[] {"pt", "_id", "v1", "v2"});
-        expected =
-                Arrays.asList(
-                        "+I[1, 1, one, NULL]",
-                        "+I[1, 2, second, NULL]",
-                        "+I[2, 3, three, 30]",
-                        "+I[2, 4, four, NULL]",
-                        "+I[1, 5, five, 50]",
-                        "+I[1, 6, six, 60]");
-        waitForResult(expected, table, rowType, primaryKeys, branch);
-
-        statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN v2 BIGINT");
-        statement.executeUpdate(
-                "INSERT INTO schema_evolution_1 VALUES (2, 7, 'seven', 70000000000)");
-        statement.executeUpdate("DELETE FROM schema_evolution_1 WHERE _id = 5");
-        statement.executeUpdate("UPDATE schema_evolution_1 SET v2 = 30000000000 WHERE _id = 3");
-        statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN v2 BIGINT");
-        statement.executeUpdate(
-                "INSERT INTO schema_evolution_2 VALUES (2, 8, 'eight', 80000000000)");
-        rowType =
-                RowType.of(
-                        new DataType[] {
-                            DataTypes.INT().notNull(),
-                            DataTypes.INT().notNull(),
-                            DataTypes.VARCHAR(10),
-                            DataTypes.BIGINT()
-                        },
-                        new String[] {"pt", "_id", "v1", "v2"});
-        expected =
-                Arrays.asList(
-                        "+I[1, 1, one, NULL]",
-                        "+I[1, 2, second, NULL]",
-                        "+I[2, 3, three, 30000000000]",
-                        "+I[2, 4, four, NULL]",
-                        "+I[1, 6, six, 60]",
-                        "+I[2, 7, seven, 70000000000]",
-                        "+I[2, 8, eight, 80000000000]");
-        waitForResult(expected, table, rowType, primaryKeys, branch);
-
-        statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v3 NUMERIC(8, 3)");
-        statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v4 VARBINARY(10)");
-        statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v5 FLOAT");
-        statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN v1 VARCHAR(20)");
-        statement.executeUpdate(
-                "INSERT INTO schema_evolution_1 VALUES (1, 9, 'nine', 90000000000, 99999.999, 'nine.bin', 9.9)");
-        statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v3 NUMERIC(8, 3)");
-        statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v4 VARBINARY(10)");
-        statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v5 FLOAT");
-        statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN v1 VARCHAR(20)");
-        statement.executeUpdate(
-                "UPDATE schema_evolution_2 SET v1 = 'very long string' WHERE _id = 8");
-        rowType =
-                RowType.of(
-                        new DataType[] {
-                            DataTypes.INT().notNull(),
-                            DataTypes.INT().notNull(),
-                            DataTypes.VARCHAR(20),
-                            DataTypes.BIGINT(),
-                            DataTypes.DECIMAL(8, 3),
-                            DataTypes.VARBINARY(10),
-                            DataTypes.FLOAT()
-                        },
-                        new String[] {"pt", "_id", "v1", "v2", "v3", "v4", "v5"});
-        expected =
-                Arrays.asList(
-                        "+I[1, 1, one, NULL, NULL, NULL, NULL]",
-                        "+I[1, 2, second, NULL, NULL, NULL, NULL]",
-                        "+I[2, 3, three, 30000000000, NULL, NULL, NULL]",
-                        "+I[2, 4, four, NULL, NULL, NULL, NULL]",
-                        "+I[1, 6, six, 60, NULL, NULL, NULL]",
-                        "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]",
-                        "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]",
-                        "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]");
-        waitForResult(expected, table, rowType, primaryKeys, branch);
-
-        statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN v4 VARBINARY(20)");
-        statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN v5 DOUBLE");
-        statement.executeUpdate(
-                "UPDATE schema_evolution_1 SET v4 = 'nine.bin.long', v5 = 9.00000000009 WHERE _id = 9");
-        statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN v4 VARBINARY(20)");
-        statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN v5 DOUBLE");
-        statement.executeUpdate(
-                "UPDATE schema_evolution_2 SET v4 = 'four.bin.long', v5 = 4.00000000004 WHERE _id = 4");
-        rowType =
-                RowType.of(
-                        new DataType[] {
-                            DataTypes.INT().notNull(),
-                            DataTypes.INT().notNull(),
-                            DataTypes.VARCHAR(20),
-                            DataTypes.BIGINT(),
-                            DataTypes.DECIMAL(8, 3),
-                            DataTypes.VARBINARY(20),
-                            DataTypes.DOUBLE()
-                        },
-                        new String[] {"pt", "_id", "v1", "v2", "v3", "v4", "v5"});
-        expected =
-                Arrays.asList(
-                        "+I[1, 1, one, NULL, NULL, NULL, NULL]",
-                        "+I[1, 2, second, NULL, NULL, NULL, NULL]",
-                        "+I[2, 3, three, 30000000000, NULL, NULL, NULL]",
-                        "+I[2, 4, four, NULL, NULL, [102, 111, 117, 114, 46, 98, 105, 110, 46, 108, 111, 110, 103], 4.00000000004]",
-                        "+I[1, 6, six, 60, NULL, NULL, NULL]",
-                        "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]",
-                        "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]",
-                        "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110, 46, 108, 111, 110, 103], 9.00000000009]");
-        waitForResult(expected, table, rowType, primaryKeys, branch);
-    }
-
     @Test
     @Timeout(60)
     public void testMultipleSchemaEvolutions() throws Exception {
@@ -438,7 +252,11 @@ public void testMultipleSchemaEvolutions() throws Exception {
         mySqlConfig.put("database-name", DATABASE_NAME);
         mySqlConfig.put("table-name", "schema_evolution_multiple");
 
-        MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig).build();
+        MySqlSyncTableAction action =
+                syncTableActionBuilder(mySqlConfig)
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
+                        .withTableConfig(getTableConfig(new HashMap<>()))
+                        .build();
         runActionWithDefaultEnv(action);
 
         checkTableSchema(
@@ -503,7 +321,7 @@ private void testSchemaEvolutionMultipleImpl(Statement statement) throws Excepti
     }
 
     @Test
-    @Timeout(90)
+    @Timeout(180)
     public void testAllTypes() throws Exception {
         // the first round checks for table creation
         // the second round checks for running the action on an existing table
@@ -512,13 +330,15 @@ public void testAllTypes() throws Exception {
         }
     }
 
-    private void testAllTypesOnce() throws Exception {
+    protected void testAllTypesOnce() throws Exception {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
         mySqlConfig.put("database-name", DATABASE_NAME);
         mySqlConfig.put("table-name", "all_types_table");
 
         MySqlSyncTableAction action =
                 syncTableActionBuilder(mySqlConfig)
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
+                        .withTableConfig(getTableConfig(new HashMap<>()))
                         .withPartitionKeys("pt")
                         .withPrimaryKeys("pt", "_id")
                         .build();
@@ -798,7 +618,7 @@ private void testAllTypesImpl(Statement statement) throws Exception {
         } finally {
             statement.executeUpdate("ALTER TABLE all_types_table DROP COLUMN v");
             SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location());
-            schemaManager.commitChanges(SchemaChange.dropColumn("v"));
+            schemaManager.commitChanges(branch, SchemaChange.dropColumn("v"));
         }
     }
 
@@ -808,7 +628,11 @@ public void testIncompatibleMySqlTable() {
         mySqlConfig.put("database-name", DATABASE_NAME);
         mySqlConfig.put("table-name", "incompatible_field_\\d+");
 
-        MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig).build();
+        MySqlSyncTableAction action =
+                syncTableActionBuilder(mySqlConfig)
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
+                        .withTableConfig(getTableConfig(new HashMap<>()))
+                        .build();
 
         assertThatThrownBy(action::run)
                 .satisfies(
@@ -834,7 +658,11 @@ public void testIncompatiblePaimonTable() throws Exception {
                 new HashMap<>());
 
         MySqlSyncTableAction action =
-                syncTableActionBuilder(mySqlConfig).withPrimaryKeys("a").build();
+                syncTableActionBuilder(mySqlConfig)
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
+                        .withTableConfig(getTableConfig(new HashMap<>()))
+                        .withPrimaryKeys("a")
+                        .build();
 
         assertThatThrownBy(action::run)
                 .satisfies(
@@ -850,7 +678,11 @@ public void testInvalidPrimaryKey() {
         mySqlConfig.put("table-name", "schema_evolution_\\d+");
 
         MySqlSyncTableAction action =
-                syncTableActionBuilder(mySqlConfig).withPrimaryKeys("pk").build();
+                syncTableActionBuilder(mySqlConfig)
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
+                        .withTableConfig(getTableConfig(new HashMap<>()))
+                        .withPrimaryKeys("pk")
+                        .build();
 
         assertThatThrownBy(action::run)
                 .satisfies(
@@ -865,7 +697,11 @@ public void testNoPrimaryKey() {
         mySqlConfig.put("database-name", DATABASE_NAME);
         mySqlConfig.put("table-name", "incompatible_pk_\\d+");
 
-        MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig).build();
+        MySqlSyncTableAction action =
+                syncTableActionBuilder(mySqlConfig)
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
+                        .withTableConfig(getTableConfig(new HashMap<>()))
+                        .build();
 
         assertThatThrownBy(action::run)
                 .satisfies(
@@ -877,7 +713,7 @@ public void testNoPrimaryKey() {
     }
 
     @Test
-    @Timeout(60)
+    @Timeout(240)
     public void testComputedColumn() throws Exception {
         // the first round checks for table creation
         // the second round checks for running the action on an existing table
@@ -920,6 +756,8 @@ private void innerTestComputedColumn(boolean executeMysql) throws Exception {
 
         MySqlSyncTableAction action =
                 syncTableActionBuilder(mySqlConfig)
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
+                        .withTableConfig(getTableConfig(new HashMap<>()))
                         .withPartitionKeys("_year_date")
                         .withPrimaryKeys("pk", "_year_date")
                         .withComputedColumnArgs(computedColumnDefs)
@@ -1020,6 +858,8 @@ public void testSyncShards() throws Exception {
 
         MySqlSyncTableAction action =
                 syncTableActionBuilder(mySqlConfig)
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
+                        .withTableConfig(getTableConfig(new HashMap<>()))
                         .withPartitionKeys("pt")
                         .withPrimaryKeys("pk", "pt")
                         .withComputedColumnArgs("pt=substring(_date,5)")
@@ -1071,7 +911,8 @@ public void testOptionsChange() throws Exception {
                         .withPartitionKeys("pt")
                         .withPrimaryKeys("pk", "pt")
                         .withComputedColumnArgs("pt=substring(_date,5)")
-                        .withTableConfig(tableConfig)
+                        .withTableConfig(getTableConfig(tableConfig))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .build();
         JobClient jobClient = runActionWithDefaultEnv(action1);
         try (Statement statement = getStatement()) {
@@ -1099,7 +940,8 @@ public void testOptionsChange() throws Exception {
                         .withPartitionKeys("pt")
                         .withPrimaryKeys("pk", "pt")
                         .withComputedColumnArgs("pt=substring(_date,5)")
-                        .withTableConfig(tableConfig)
+                        .withTableConfig(getTableConfig(tableConfig))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .build();
         runActionWithDefaultEnv(action2);
 
@@ -1121,6 +963,8 @@ public void testMetadataColumns() throws Exception {
 
         MySqlSyncTableAction action =
                 syncTableActionBuilder(mySqlConfig)
+                        .withTableConfig(getTableConfig(new HashMap<>()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .withPrimaryKeys("pk")
                         .withMetadataColumns("table_name", "database_name", "op_ts")
                         .build();
@@ -1158,15 +1002,20 @@ public void testMetadataColumns() throws Exception {
 
     @Test
     public void testCatalogAndTableConfig() {
+        Map<String, String> catalogOptions = new HashMap<>();
+        catalogOptions.put("catalog-key", "catalog-value");
+
+        Map<String, String> tableConfig = new HashMap<>();
+        tableConfig.put("table-key", "table-value");
+
         MySqlSyncTableAction action =
                 syncTableActionBuilder(getBasicMySqlConfig())
-                        .withCatalogConfig(Collections.singletonMap("catalog-key", "catalog-value"))
-                        .withTableConfig(Collections.singletonMap("table-key", "table-value"))
+                        .withTableConfig(getTableConfig(tableConfig))
+                        .withCatalogConfig(getCatalogOptions(catalogOptions))
                         .build();
 
         assertThat(action.catalogConfig()).containsEntry("catalog-key", "catalog-value");
-        assertThat(action.tableConfig())
-                .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
+        assertThat(action.tableConfig()).containsExactlyEntriesOf(getTableConfig(tableConfig));
     }
 
     private FileStoreTable getFileStoreTable() throws Exception {
@@ -1183,7 +1032,11 @@ public void testDefaultCheckpointInterval() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRestartStrategy(RestartStrategies.noRestart());
 
-        MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig).build();
+        MySqlSyncTableAction action =
+                syncTableActionBuilder(mySqlConfig)
+                        .withTableConfig(getTableConfig(new HashMap<>()))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
+                        .build();
         action.withStreamExecutionEnvironment(env);
 
         Thread thread =
@@ -1214,11 +1067,13 @@ public void testComputedColumnWithCaseInsensitive() throws Exception {
         mySqlConfig.put("database-name", "computed_column_with_case_insensitive");
         mySqlConfig.put("table-name", "t");
 
+        Map<String, String> catalogConfig = new HashMap<>();
+        catalogConfig.put(FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false");
+
         MySqlSyncTableAction action =
                 syncTableActionBuilder(mySqlConfig)
-                        .withCatalogConfig(
-                                Collections.singletonMap(
-                                        FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+                        .withCatalogConfig(getCatalogOptions(catalogConfig))
+                        .withTableConfig(getTableConfig(new HashMap<>()))
                         .withComputedColumnArgs("SUBSTRING=substring(UPPERCASE_STRING,2)")
                         .build();
         runActionWithDefaultEnv(action);
@@ -1250,11 +1105,13 @@ public void testSpecifyKeysWithCaseInsensitive() throws Exception {
         mySqlConfig.put("database-name", "specify_key_with_case_insensitive");
         mySqlConfig.put("table-name", "t");
 
+        Map<String, String> catalogConfig = new HashMap<>();
+        catalogConfig.put(FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false");
+
         MySqlSyncTableAction action =
                 syncTableActionBuilder(mySqlConfig)
-                        .withCatalogConfig(
-                                Collections.singletonMap(
-                                        FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+                        .withCatalogConfig(getCatalogOptions(catalogConfig))
+                        .withTableConfig(getTableConfig(new HashMap<>()))
                         .withPrimaryKeys("ID1", "PART")
                         .withPartitionKeys("PART")
                         .build();
@@ -1278,9 +1135,13 @@ public void testInvalidAlterBucket() throws Exception {
         mySqlConfig.put("database-name", "invalid_alter_bucket");
         mySqlConfig.put("table-name", "t");
 
+        Map<String, String> tableConfig = new HashMap<>();
+        tableConfig.put(BUCKET.key(), "2");
+
         MySqlSyncTableAction action =
                 syncTableActionBuilder(mySqlConfig)
-                        .withTableConfig(Collections.singletonMap(BUCKET.key(), "2"))
+                        .withTableConfig(getTableConfig(tableConfig))
+                        .withCatalogConfig(getCatalogOptions(new HashMap<>()))
                         .build();
 
         assertThatCode(action::build).doesNotThrowAnyException();
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionToBranchITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionToBranchITCase.java
new file mode 100644
index 0000000000000..f3020303423db
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionToBranchITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+
+/** IT cases for {@link MySqlSyncTableAction}. */
+public class MySqlSyncTableActionToBranchITCase extends MySqlSyncTableActionITCase {
+    @BeforeEach
+    public void before() throws IOException {
+        super.branch = "testMySqlSyncTableActionBranch";
+        super.before();
+    }
+
+    @Test
+    @Timeout(360)
+    public void testAllTypes() throws Exception {
+        // the first round checks for table creation
+        testAllTypesOnce();
+        // the second round checks for running the action on an existing table
+        //        for (int i = 0; i < 2; i++) {
+        //            testAllTypesOnce();
+        //        }
+    }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
index 3a01e4768334e..54827158f96b2 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
@@ -36,6 +36,7 @@
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BranchManager;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -71,7 +72,8 @@ public abstract class ActionITCaseBase extends AbstractTestBase {
     protected StreamTableWrite write;
     protected StreamTableCommit commit;
     protected Catalog catalog;
-    private long incrementalIdentifier;
+    protected long incrementalIdentifier;
+    protected String branch = BranchManager.DEFAULT_MAIN_BRANCH;
 
     @BeforeEach
     public void before() throws IOException {
@@ -80,7 +82,12 @@ public void before() throws IOException {
         tableName = "test_table_" + UUID.randomUUID();
         commitUser = UUID.randomUUID().toString();
         incrementalIdentifier = 0;
-        catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+        Map<String, String> options = new HashMap<>();
+        options.put(WAREHOUSE.key(), new Path(warehouse).toUri().toString());
+        if (!branch.equals(BranchManager.DEFAULT_MAIN_BRANCH)) {
+            options.put(BRANCH.key(), branch);
+        }
+        catalog = CatalogFactory.createCatalog(CatalogContext.create(Options.fromMap(options)));
     }
 
     @AfterEach
@@ -125,16 +132,6 @@ protected FileStoreTable createFileStoreTable(
         return (FileStoreTable) catalog.getTable(identifier);
     }
 
-    protected FileStoreTable getFileStoreTable(String tableName, String branch) throws Exception {
-        Map<String, String> options = new HashMap<>();
-        options.put(WAREHOUSE.key(), new Path(warehouse).toUri().toString());
-        options.put(BRANCH.key(), branch);
-        Catalog catalogBranch =
-                CatalogFactory.createCatalog(CatalogContext.create(Options.fromMap(options)));
-        Identifier identifier = Identifier.create(database, tableName);
-        return (FileStoreTable) catalogBranch.getTable(identifier);
-    }
-
     protected FileStoreTable getFileStoreTable(String tableName) throws Exception {
         Identifier identifier = Identifier.create(database, tableName);
         return (FileStoreTable) catalog.getTable(identifier);