diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java index ed5ecc581ee5d..294d73651d853 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -106,5 +106,8 @@ public class CatalogOptions { .build()); public static final ConfigOption BRANCH = - key("branch").stringType().defaultValue("main").withDescription("Specify the branch submitted by the schema."); + key("branch") + .stringType() + .defaultValue("main") + .withDescription("Specify the branch submitted by the schema."); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 320874fab9fac..eedf757708d24 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -72,8 +72,6 @@ import java.util.concurrent.Callable; import java.util.stream.Collectors; -import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; - /** * Default implementation of {@link FileStoreCommit}. * @@ -185,7 +183,8 @@ public Set filterCommitted(Set commitIdentifiers) { return commitIdentifiers; } - Optional latestSnapshot = snapshotManager.latestSnapshotOfUser(commitUser); + Optional latestSnapshot = + snapshotManager.latestSnapshotOfUser(branchName, commitUser); if (latestSnapshot.isPresent()) { Set result = new HashSet<>(); for (Long identifier : commitIdentifiers) { @@ -643,7 +642,7 @@ private int tryOverwrite( Map logOffsets) { int cnt = 0; while (true) { - Snapshot latestSnapshot = snapshotManager.latestSnapshot(); + Snapshot latestSnapshot = snapshotManager.latestSnapshot(branchName); cnt++; List changesWithOverwrite = new ArrayList<>(); @@ -711,10 +710,7 @@ public boolean tryCommitOnce( @Nullable String newStatsFileName) { long newSnapshotId = latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshot.id() + 1; - Path newSnapshotPath = - branchName.equals(DEFAULT_MAIN_BRANCH) - ? snapshotManager.snapshotPath(newSnapshotId) - : snapshotManager.snapshotPath(branchName, newSnapshotId); + Path newSnapshotPath = snapshotManager.snapshotPath(branchName, newSnapshotId); if (LOG.isDebugEnabled()) { LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index c2081f304ffa4..4e039c7ff24b1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -188,7 +188,8 @@ public TableSchema commitChanges(String branchName, List changes) Catalog.ColumnNotExistException { while (true) { TableSchema schema = - latest().orElseThrow( + latest(branchName) + .orElseThrow( () -> new Catalog.TableNotExistException( fromPath(tableRoot.toString(), true))); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index dd3ba712ad980..c4b99546bf8d6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -39,8 +39,10 @@ import org.apache.paimon.table.sink.CommitCallback; import org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor; import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor; +import org.apache.paimon.table.sink.InnerTableWrite; import org.apache.paimon.table.sink.RowKeyExtractor; import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.table.sink.UnawareBucketRowKeyExtractor; import org.apache.paimon.table.source.InnerStreamTableScan; import org.apache.paimon.table.source.InnerStreamTableScanImpl; @@ -184,7 +186,8 @@ public InnerStreamTableScan newStreamScan(String branch) { newSnapshotReader(branch), snapshotManager(), supportStreamingReadOverwrite(), - DefaultValueAssigner.create(tableSchema)); + DefaultValueAssigner.create(tableSchema), + branch); } protected abstract SplitGenerator splitGenerator(); @@ -333,6 +336,16 @@ public TableCommitImpl newCommit(String commitUser) { return newCommit(commitUser, DEFAULT_MAIN_BRANCH); } + @Override + public TableWriteImpl newWrite(String commitUser) { + return newWrite(commitUser, null, BranchManager.DEFAULT_MAIN_BRANCH); + } + + @Override + public InnerTableWrite newWrite(String commitUser, String branch) { + return newWrite(commitUser, null, branch); + } + public TableCommitImpl newCommit(String commitUser, String branchName) { CoreOptions options = coreOptions(); Runnable snapshotExpire = null; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 3009d8baf29fc..3a3a3253fc63e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -33,6 +33,7 @@ import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.query.LocalTableQuery; +import org.apache.paimon.table.sink.InnerTableWrite; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.table.source.AbstractDataTableRead; import org.apache.paimon.table.source.AppendOnlySplitGenerator; @@ -145,7 +146,12 @@ public RecordReader reader(Split split) throws IOException { @Override public TableWriteImpl newWrite(String commitUser) { - return newWrite(commitUser, null); + return newWrite(commitUser, null, BranchManager.DEFAULT_MAIN_BRANCH); + } + + @Override + public InnerTableWrite newWrite(String commitUser, String branch) { + return newWrite(commitUser, null, branch); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java b/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java index f97b5bcdf0f44..477887305e648 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java @@ -53,8 +53,12 @@ default InnerTableRead newRead(String branch) { InnerTableWrite newWrite(String commitUser); + InnerTableWrite newWrite(String commitUser, String branch); + InnerTableCommit newCommit(String commitUser); + InnerTableCommit newCommit(String commitUser, String branch); + @Override default ReadBuilder newReadBuilder() { return new ReadBuilderImpl(this); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index f98bb7168aa3e..cbda4d90d14df 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -202,7 +202,7 @@ public InnerTableRead forceKeepDelete() { @Override public TableWriteImpl newWrite(String commitUser) { - return newWrite(commitUser, null); + return newWrite(commitUser, null, DEFAULT_MAIN_BRANCH); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index f0d52b641015e..0e6a497b6a975 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -77,6 +77,14 @@ default InnerTableWrite newWrite(String commitUser) { this.getClass().getSimpleName())); } + @Override + default InnerTableWrite newWrite(String commitUser, String branch) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support newWrite with branch.", + this.getClass().getSimpleName())); + } + @Override default InnerTableCommit newCommit(String commitUser) { throw new UnsupportedOperationException( @@ -85,6 +93,14 @@ default InnerTableCommit newCommit(String commitUser) { this.getClass().getSimpleName())); } + @Override + default InnerTableCommit newCommit(String commitUser, String branch) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support newCommit with branch.", + this.getClass().getSimpleName())); + } + @Override default InnerStreamTableScan newStreamScan() { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java index ee0091a7bff98..5832a0d8feca9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java @@ -61,6 +61,8 @@ default BatchWriteBuilder withOverwrite() { return this; } + BatchWriteBuilder toBranch(String branch); + /** Overwrite writing, same as the 'INSERT OVERWRITE T PARTITION (...)' semantics of SQL. */ BatchWriteBuilder withOverwrite(@Nullable Map staticPartition); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java index 87693ba0a7b86..756296cb2e094 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java @@ -20,6 +20,7 @@ import org.apache.paimon.table.InnerTable; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import javax.annotation.Nullable; @@ -33,6 +34,7 @@ public class BatchWriteBuilderImpl implements BatchWriteBuilder { private final InnerTable table; private final String commitUser = UUID.randomUUID().toString(); + private String branch = BranchManager.DEFAULT_MAIN_BRANCH; private Map staticPartition; @@ -56,16 +58,23 @@ public BatchWriteBuilder withOverwrite(@Nullable Map staticParti return this; } + @Override + public BatchWriteBuilder toBranch(String branch) { + this.branch = branch; + return this; + } + @Override public BatchTableWrite newWrite() { - return table.newWrite(commitUser) + return table.newWrite(commitUser, branch) .withIgnorePreviousFiles(staticPartition != null) .withExecutionMode(false); } @Override public BatchTableCommit newCommit() { - InnerTableCommit commit = table.newCommit(commitUser).withOverwrite(staticPartition); + InnerTableCommit commit = + table.newCommit(commitUser, branch).withOverwrite(staticPartition); commit.ignoreEmptyCommit(true); return commit; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamWriteBuilder.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamWriteBuilder.java index 7f272e0751265..3236aaa52ff01 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamWriteBuilder.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamWriteBuilder.java @@ -51,6 +51,8 @@ public interface StreamWriteBuilder extends WriteBuilder { */ StreamWriteBuilder withCommitUser(String commitUser); + StreamWriteBuilder toBranch(String branch); + /** Create a {@link TableWrite} to write {@link InternalRow}s. */ @Override StreamTableWrite newWrite(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamWriteBuilderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamWriteBuilderImpl.java index 402b0beb2eab9..d78bcedd49c0f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamWriteBuilderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamWriteBuilderImpl.java @@ -20,6 +20,7 @@ import org.apache.paimon.table.InnerTable; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import java.util.UUID; @@ -31,6 +32,7 @@ public class StreamWriteBuilderImpl implements StreamWriteBuilder { private final InnerTable table; private String commitUser = UUID.randomUUID().toString(); + private String branch = BranchManager.DEFAULT_MAIN_BRANCH; public StreamWriteBuilderImpl(InnerTable table) { this.table = table; @@ -57,13 +59,19 @@ public StreamWriteBuilder withCommitUser(String commitUser) { return this; } + @Override + public StreamWriteBuilder toBranch(String branch) { + this.branch = branch; + return this; + } + @Override public StreamTableWrite newWrite() { - return table.newWrite(commitUser); + return table.newWrite(commitUser, branch); } @Override public StreamTableCommit newCommit() { - return table.newCommit(commitUser).ignoreEmptyCommit(false); + return table.newCommit(commitUser, branch).ignoreEmptyCommit(false); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 5f8e58537e64f..bbd28cb93230d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -79,13 +79,9 @@ public Path snapshotDirectory() { } public Path snapshotDirectory(String branchName) { - return new Path(getBranchPath(tablePath, branchName) + "/snapshot"); - } - - public Path snapshotDirectoryByBranch(String branchName) { return branchName.equals(DEFAULT_MAIN_BRANCH) ? snapshotDirectory() - : snapshotDirectory(branchName); + : new Path(getBranchPath(tablePath, branchName) + "/snapshot"); } public Path snapshotPath(long snapshotId) { @@ -93,14 +89,13 @@ public Path snapshotPath(long snapshotId) { } public Path snapshotPath(String branchName, long snapshotId) { - return new Path( - getBranchPath(tablePath, branchName) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId); - } - - public Path snapshotPathByBranch(String branchName, long snapshotId) { return branchName.equals(DEFAULT_MAIN_BRANCH) ? snapshotPath(snapshotId) - : snapshotPath(branchName, snapshotId); + : new Path( + getBranchPath(tablePath, branchName) + + "/snapshot/" + + SNAPSHOT_PREFIX + + snapshotId); } public Snapshot snapshot(long snapshotId) { @@ -108,7 +103,7 @@ public Snapshot snapshot(long snapshotId) { } public Snapshot snapshot(String branchName, long snapshotId) { - Path snapshotPath = snapshotPathByBranch(branchName, snapshotId); + Path snapshotPath = snapshotPath(branchName, snapshotId); return Snapshot.fromPath(fileIO, snapshotPath); } @@ -117,7 +112,7 @@ public boolean snapshotExists(long snapshotId) { } public boolean snapshotExists(String branchName, long snapshotId) { - Path path = snapshotPathByBranch(branchName, snapshotId); + Path path = snapshotPath(branchName, snapshotId); try { return fileIO.exists(path); } catch (IOException e) { @@ -265,8 +260,7 @@ public long snapshotCount() throws IOException { } public long snapshotCount(String branch) throws IOException { - return listVersionedFiles(fileIO, snapshotDirectoryByBranch(branch), SNAPSHOT_PREFIX) - .count(); + return listVersionedFiles(fileIO, snapshotDirectory(branch), SNAPSHOT_PREFIX).count(); } public Iterator snapshots() throws IOException { @@ -429,7 +423,7 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { } private @Nullable Long findLatest(String branchName) throws IOException { - Path snapshotDir = snapshotDirectoryByBranch(branchName); + Path snapshotDir = snapshotDirectory(branchName); if (!fileIO.exists(snapshotDir)) { return null; } @@ -447,7 +441,7 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { } private @Nullable Long findEarliest(String branchName) throws IOException { - Path snapshotDir = snapshotDirectoryByBranch(branchName); + Path snapshotDir = snapshotDirectory(branchName); if (!fileIO.exists(snapshotDir)) { return null; } @@ -466,7 +460,7 @@ public Long readHint(String fileName) { } public Long readHint(String fileName, String branchName) { - Path snapshotDir = snapshotDirectoryByBranch(branchName); + Path snapshotDir = snapshotDirectory(branchName); Path path = new Path(snapshotDir, fileName); int retryNumber = 0; while (retryNumber++ < READ_HINT_RETRY_NUM) { @@ -486,7 +480,7 @@ public Long readHint(String fileName, String branchName) { private Long findByListFiles(BinaryOperator reducer, String branchName) throws IOException { - Path snapshotDir = snapshotDirectoryByBranch(branchName); + Path snapshotDir = snapshotDirectory(branchName); return listVersionedFiles(fileIO, snapshotDir, SNAPSHOT_PREFIX) .reduce(reducer) .orElse(null); @@ -510,7 +504,7 @@ public void commitEarliestHint(long snapshotId, String branchName) throws IOExce private void commitHint(long snapshotId, String fileName, String branchName) throws IOException { - Path snapshotDir = snapshotDirectoryByBranch(branchName); + Path snapshotDir = snapshotDirectory(branchName); Path hintFile = new Path(snapshotDir, fileName); fileIO.overwriteFileUtf8(hintFile, String.valueOf(snapshotId)); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java index 68552984fbe73..1779ca9eb4047 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java @@ -156,7 +156,8 @@ public DataStreamSink sinkFrom( protected OneInputStreamOperator createWriteOperator( StoreSinkWrite.WithWriteBufferProvider writeProvider, String commitUser) { return new CdcRecordStoreMultiWriteOperator( - catalogLoader, writeProvider, commitUser, new Options()); + catalogLoader, writeProvider, commitUser, new Options()) + .toBranch(branch); } // Table committers are dynamically created at runtime @@ -166,7 +167,8 @@ protected OneInputStreamOperator crea // commit new files list even if they're empty. // Otherwise we can't tell if the commit is successful after // a restart. - return (user, metricGroup) -> new StoreMultiCommitter(catalogLoader, user, metricGroup); + return (user, metricGroup) -> + new StoreMultiCommitter(catalogLoader, user, metricGroup, branch); } protected CommittableStateManager createCommittableStateManager() { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java index 80fe1a485fc80..8a0df92965249 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java @@ -169,11 +169,11 @@ private void buildCombinedCdcSink() { private void buildForFixedBucket(FileStoreTable table, DataStream parsed) { DataStream partitioned = partition(parsed, new CdcRecordChannelComputer(table.schema()), parallelism); - new CdcFixedBucketSink(table).sinkFrom(partitioned); + new CdcFixedBucketSink(table).toBranch(branch).sinkFrom(partitioned); } private void buildForUnawareBucket(FileStoreTable table, DataStream parsed) { - new CdcUnawareBucketSink(table, parallelism).sinkFrom(parsed); + new CdcUnawareBucketSink(table, parallelism).toBranch(branch).sinkFrom(parsed); } private void buildDividedCdcSink() { @@ -211,7 +211,8 @@ private void buildDividedCdcSink() { buildForFixedBucket(table, parsedForTable); break; case DYNAMIC: - new CdcDynamicBucketSink(table).build(parsedForTable, parallelism); + ((CdcDynamicBucketSink) new CdcDynamicBucketSink(table).toBranch(branch)) + .build(parsedForTable, parallelism); break; case UNAWARE: buildForUnawareBucket(table, parsedForTable); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java index 7a621030217a6..bcd90583bea94 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.action.cdc; +import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.action.ActionBase; import org.apache.paimon.flink.action.ActionITCaseBase; import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncDatabaseActionFactory; @@ -54,6 +55,7 @@ import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import static org.apache.paimon.options.CatalogOptions.BRANCH; import static org.assertj.core.api.Assertions.assertThat; /** CDC IT case base. */ @@ -130,7 +132,7 @@ protected void waitForResult( break; } } - table = table.copyWithLatestSchema(); + table = table.copyWithLatestSchema(branch); Thread.sleep(1000); } @@ -138,7 +140,7 @@ protected void waitForResult( List sortedExpected = new ArrayList<>(expected); Collections.sort(sortedExpected); while (true) { - ReadBuilder readBuilder = table.newReadBuilder(); + ReadBuilder readBuilder = table.newReadBuilder().fromBranch(branch); TableScan.Plan plan = readBuilder.newScan().plan(); List result = getResult( @@ -154,6 +156,16 @@ protected void waitForResult( } } + protected Map getCatalogOptions(Map catalogOptions) { + catalogOptions.put(BRANCH.key(), branch); + return catalogOptions; + } + + protected Map getTableConfig(Map tableConfig) { + tableConfig.put(FlinkConnectorOptions.BRANCH.key(), branch); + return tableConfig; + } + protected Map getBasicTableConfig() { Map config = new HashMap<>(); ThreadLocalRandom random = ThreadLocalRandom.current(); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncDatabaseToBranchActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncDatabaseToBranchActionITCase.java new file mode 100644 index 0000000000000..53246624744f2 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncDatabaseToBranchActionITCase.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.kafka; + +import org.junit.jupiter.api.BeforeEach; + +import java.io.IOException; + +/** IT cases for {@link KafkaSyncDatabaseAction}. */ +public class KafkaDebeziumSyncDatabaseToBranchActionITCase + extends KafkaDebeziumSyncDatabaseActionITCase { + @BeforeEach + public void before() throws IOException { + branch = "testKafkaDebeziumSyncDatabaseBranch"; + super.before(); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableToBranchActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableToBranchActionITCase.java new file mode 100644 index 0000000000000..77860bbb37e8b --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableToBranchActionITCase.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.kafka; + +import org.junit.jupiter.api.BeforeEach; + +import java.io.IOException; + +/** IT cases for {@link KafkaSyncTableAction}. */ +public class KafkaDebeziumSyncTableToBranchActionITCase extends KafkaDebeziumSyncTableActionITCase { + + @BeforeEach + public void before() throws IOException { + branch = "testKafkaDebeziumSyncTableBranch"; + super.before(); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java index cdad175ea15fa..cf474ea60f318 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java @@ -28,6 +28,7 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -69,7 +70,8 @@ protected void testSchemaEvolutionMultiTopic(String format) throws Exception { kafkaConfig.put(TOPIC.key(), String.join(";", topics)); KafkaSyncDatabaseAction action = syncDatabaseActionBuilder(kafkaConfig) - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .build(); runActionWithDefaultEnv(action); @@ -103,7 +105,8 @@ protected void testSchemaEvolutionOneTopic(String format) throws Exception { kafkaConfig.put(TOPIC.key(), String.join(";", topics)); KafkaSyncDatabaseAction action = syncDatabaseActionBuilder(kafkaConfig) - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .build(); runActionWithDefaultEnv(action); @@ -112,6 +115,7 @@ protected void testSchemaEvolutionOneTopic(String format) throws Exception { private void testSchemaEvolutionImpl( List topics, boolean writeOne, int fileCount, String format) throws Exception { + waitingTables("t1", "t2"); FileStoreTable table1 = getFileStoreTable("t1"); @@ -212,7 +216,11 @@ protected void testTopicIsEmpty(String format) { Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); - KafkaSyncDatabaseAction action = syncDatabaseActionBuilder(kafkaConfig).build(); + KafkaSyncDatabaseAction action = + syncDatabaseActionBuilder(kafkaConfig) + .withTableConfig(getTableConfig(new HashMap<>())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) + .build(); assertThatThrownBy(action::run) .satisfies( @@ -267,7 +275,8 @@ protected void testTableAffixMultiTopic(String format) throws Exception { syncDatabaseActionBuilder(kafkaConfig) .withTablePrefix("test_prefix_") .withTableSuffix("_test_suffix") - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) // test including check with affix .includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*") .build(); @@ -321,7 +330,8 @@ protected void testTableAffixOneTopic(String format) throws Exception { syncDatabaseActionBuilder(kafkaConfig) .withTablePrefix("test_prefix_") .withTableSuffix("_test_suffix") - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) // test including check with affix .includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*") .build(); @@ -490,7 +500,8 @@ private void includingAndExcludingTablesImpl( syncDatabaseActionBuilder(kafkaConfig) .includingTables(includingTables) .excludingTables(excludingTables) - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .build(); runActionWithDefaultEnv(action); @@ -516,12 +527,13 @@ protected void testCaseInsensitive(String format) throws Exception { kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); kafkaConfig.put(TOPIC.key(), topic); + Map catalogConfig = new HashMap<>(); + catalogConfig.put(FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"); + KafkaSyncDatabaseAction action = syncDatabaseActionBuilder(kafkaConfig) - .withTableConfig(getBasicTableConfig()) - .withCatalogConfig( - Collections.singletonMap( - FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false")) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(catalogConfig)) .build(); runActionWithDefaultEnv(action); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java index f2f8fc246682d..e6e6310b5130f 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -74,7 +75,8 @@ protected void runSingleTableSchemaEvolution(String sourceDir, String format) th KafkaSyncTableAction action = syncTableActionBuilder(kafkaConfig) .withPrimaryKeys("id") - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .build(); runActionWithDefaultEnv(action); @@ -177,7 +179,8 @@ public void testNotSupportFormat(String format) throws Exception { KafkaSyncTableAction action = syncTableActionBuilder(kafkaConfig) .withPrimaryKeys("id") - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .build(); assertThatThrownBy(action::run) @@ -216,7 +219,8 @@ protected void testAssertSchemaCompatible(String format) throws Exception { KafkaSyncTableAction action = syncTableActionBuilder(kafkaConfig) .withPrimaryKeys("id") - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .build(); assertThatThrownBy(action::run) @@ -248,7 +252,8 @@ protected void testStarUpOptionSpecific(String format) throws Exception { KafkaSyncTableAction action = syncTableActionBuilder(kafkaConfig) .withPrimaryKeys("id") - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .build(); runActionWithDefaultEnv(action); @@ -289,7 +294,8 @@ protected void testStarUpOptionLatest(String format) throws Exception { KafkaSyncTableAction action = syncTableActionBuilder(kafkaConfig) .withPrimaryKeys("id") - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .build(); runActionWithDefaultEnv(action); @@ -344,7 +350,8 @@ public void testStarUpOptionTimestamp(String format) throws Exception { KafkaSyncTableAction action = syncTableActionBuilder(kafkaConfig) .withPrimaryKeys("id") - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .build(); runActionWithDefaultEnv(action); @@ -396,7 +403,8 @@ public void testStarUpOptionEarliest(String format) throws Exception { KafkaSyncTableAction action = syncTableActionBuilder(kafkaConfig) .withPrimaryKeys("id") - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .build(); runActionWithDefaultEnv(action); @@ -450,7 +458,8 @@ public void testStarUpOptionGroup(String format) throws Exception { KafkaSyncTableAction action = syncTableActionBuilder(kafkaConfig) .withPrimaryKeys("id") - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .build(); runActionWithDefaultEnv(action); @@ -506,7 +515,8 @@ public void testComputedColumn(String format) throws Exception { .withPartitionKeys("_year") .withPrimaryKeys("_id", "_year") .withComputedColumnArgs("_year=year(_date)") - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .build(); runActionWithDefaultEnv(action); @@ -542,7 +552,8 @@ protected void testCDCOperations(String format) throws Exception { KafkaSyncTableAction action = syncTableActionBuilder(kafkaConfig) .withPrimaryKeys("id") - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .build(); runActionWithDefaultEnv(action); @@ -648,14 +659,17 @@ public void testWaterMarkSyncTable(String format) throws Exception { config.put("scan.watermark.alignment.update-interval", "1 s"); KafkaSyncTableAction action = - syncTableActionBuilder(kafkaConfig).withTableConfig(config).build(); + syncTableActionBuilder(kafkaConfig) + .withTableConfig(getTableConfig(config)) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) + .build(); runActionWithDefaultEnv(action); FileStoreTable table = (FileStoreTable) catalog.getTable(new Identifier(database, tableName)); while (true) { - if (table.snapshotManager().snapshotCount() > 0 - && table.snapshotManager().latestSnapshot().watermark() + if (table.snapshotManager().snapshotCount(branch) > 0 + && table.snapshotManager().latestSnapshot(branch).watermark() != -9223372036854775808L) { return; } @@ -680,7 +694,8 @@ public void testSchemaIncludeRecord(String format) throws Exception { KafkaSyncTableAction action = syncTableActionBuilder(kafkaConfig) .withPrimaryKeys("id") - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .build(); runActionWithDefaultEnv(action); @@ -717,7 +732,8 @@ public void testAllTypesWithSchemaImpl(String format) throws Exception { syncTableActionBuilder(kafkaConfig) .withPartitionKeys("pt") .withPrimaryKeys("pt", "_id") - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .build(); runActionWithDefaultEnv(action); @@ -969,7 +985,8 @@ protected void testTableFiledValNull(String format) throws Exception { KafkaSyncTableAction action = syncTableActionBuilder(kafkaConfig) .withPrimaryKeys("id") - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .build(); runActionWithDefaultEnv(action); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java index 010041268c8a5..38bcec75e4b93 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java @@ -68,6 +68,9 @@ protected static void start() { } protected Statement getStatement() throws SQLException { + System.out.println(MYSQL_CONTAINER.getJdbcUrl()); + System.out.println(MYSQL_CONTAINER.getUsername()); + System.out.println(MYSQL_CONTAINER.getPassword()); Connection conn = DriverManager.getConnection( MYSQL_CONTAINER.getJdbcUrl(), diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java index 84169ffe02b76..a00abd48369f1 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java @@ -79,7 +79,8 @@ public void testSchemaEvolution() throws Exception { MySqlSyncDatabaseAction action = syncDatabaseActionBuilder(mySqlConfig) - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .build(); runActionWithDefaultEnv(action); @@ -207,7 +208,11 @@ public void testSpecifiedMySqlTable() { mySqlConfig.put("database-name", "paimon_sync_database"); mySqlConfig.put("table-name", "my_table"); - MySqlSyncDatabaseAction action = syncDatabaseActionBuilder(mySqlConfig).build(); + MySqlSyncDatabaseAction action = + syncDatabaseActionBuilder(mySqlConfig) + .withTableConfig(getTableConfig(new HashMap<>())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) + .build(); assertThatThrownBy(action::run) .isInstanceOf(IllegalArgumentException.class) @@ -222,7 +227,11 @@ public void testInvalidDatabase() { Map mySqlConfig = getBasicMySqlConfig(); mySqlConfig.put("database-name", "invalid"); - MySqlSyncDatabaseAction action = syncDatabaseActionBuilder(mySqlConfig).build(); + MySqlSyncDatabaseAction action = + syncDatabaseActionBuilder(mySqlConfig) + .withTableConfig(getTableConfig(new HashMap<>())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) + .build(); assertThatThrownBy(action::run) .isInstanceOf(IllegalArgumentException.class) @@ -249,7 +258,8 @@ public void testIgnoreIncompatibleTables() throws Exception { MySqlSyncDatabaseAction action = syncDatabaseActionBuilder(mySqlConfig) - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .ignoreIncompatible(true) .build(); runActionWithDefaultEnv(action); @@ -296,7 +306,8 @@ public void testTableAffix() throws Exception { MySqlSyncDatabaseAction action = syncDatabaseActionBuilder(mySqlConfig) - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .withTablePrefix("test_prefix_") .withTableSuffix("_test_suffix") // test including check with affix @@ -453,7 +464,8 @@ private void includingAndExcludingTablesImpl( MySqlSyncDatabaseAction action = syncDatabaseActionBuilder(mySqlConfig) - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .includingTables(includingTables) .excludingTables(excludingTables) .build(); @@ -470,12 +482,13 @@ public void testIgnoreCase() throws Exception { Map mySqlConfig = getBasicMySqlConfig(); mySqlConfig.put("database-name", "paimon_ignore_CASE"); + Map catalogConfig = new HashMap<>(); + catalogConfig.put(FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"); + MySqlSyncDatabaseAction action = syncDatabaseActionBuilder(mySqlConfig) - .withCatalogConfig( - Collections.singletonMap( - FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false")) - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(catalogConfig)) .build(); runActionWithDefaultEnv(action); @@ -569,6 +582,8 @@ public void testAddIgnoredTable() throws Exception { MySqlSyncDatabaseAction action = syncDatabaseActionBuilder(mySqlConfig) .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .includingTables("t.+") .excludingTables(".*a$") .withMode(COMBINED.configString()) @@ -846,16 +861,15 @@ private JobClient buildSyncDatabaseActionWithNewlyAddedTables( mySqlConfig.put("database-name", databaseName); mySqlConfig.put("scan.incremental.snapshot.chunk.size", "1"); - Map catalogConfig = - testSchemaChange - ? Collections.singletonMap( - CatalogOptions.METASTORE.key(), "test-alter-table") - : Collections.emptyMap(); + Map catalogConfig = new HashMap<>(); + if (testSchemaChange) { + catalogConfig.put(CatalogOptions.METASTORE.key(), "test-alter-table"); + } MySqlSyncDatabaseAction action = syncDatabaseActionBuilder(mySqlConfig) - .withCatalogConfig(catalogConfig) - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(catalogConfig)) .includingTables("t.+") .withMode(COMBINED.configString()) .build(); @@ -895,7 +909,8 @@ public void testSyncManyTableWithLimitedMemory() throws Exception { MySqlSyncDatabaseAction action = syncDatabaseActionBuilder(mySqlConfig) - .withTableConfig(tableConfig) + .withTableConfig(getTableConfig(tableConfig)) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .withMode(COMBINED.configString()) .build(); runActionWithDefaultEnv(action); @@ -942,7 +957,8 @@ public void testSyncMultipleShards() throws Exception { MultiTablesSinkMode mode = ThreadLocalRandom.current().nextBoolean() ? DIVIDED : COMBINED; MySqlSyncDatabaseAction action = syncDatabaseActionBuilder(mySqlConfig) - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .withMode(mode.configString()) .build(); runActionWithDefaultEnv(action); @@ -1050,7 +1066,8 @@ public void testSyncMultipleShardsWithoutMerging() throws Exception { MultiTablesSinkMode mode = ThreadLocalRandom.current().nextBoolean() ? DIVIDED : COMBINED; MySqlSyncDatabaseAction action = syncDatabaseActionBuilder(mySqlConfig) - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .mergeShards(false) .withMode(mode.configString()) .build(); @@ -1165,6 +1182,8 @@ public void testMonitoredAndExcludedTablesWithMering() throws Exception { MySqlSyncDatabaseAction action = syncDatabaseActionBuilder(mySqlConfig) + .withTableConfig(getTableConfig(new HashMap<>())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .ignoreIncompatible(true) .withMode(COMBINED.configString()) .build(); @@ -1202,7 +1221,8 @@ public void testNewlyAddedTablesOptionsChange() throws Exception { MySqlSyncDatabaseAction action1 = syncDatabaseActionBuilder(mySqlConfig) - .withTableConfig(tableConfig) + .withTableConfig(getTableConfig(tableConfig)) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .withMode(COMBINED.configString()) .build(); @@ -1228,7 +1248,10 @@ public void testNewlyAddedTablesOptionsChange() throws Exception { } MySqlSyncDatabaseAction action2 = - syncDatabaseActionBuilder(mySqlConfig).withTableConfig(tableConfig).build(); + syncDatabaseActionBuilder(mySqlConfig) + .withTableConfig(getTableConfig(tableConfig)) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) + .build(); runActionWithDefaultEnv(action2); waitingTables("t2"); @@ -1238,15 +1261,20 @@ public void testNewlyAddedTablesOptionsChange() throws Exception { @Test public void testCatalogAndTableConfig() { + Map catalogConfig = new HashMap<>(); + catalogConfig.put("catalog-key", "catalog-value"); + + Map tableConfig = new HashMap<>(); + tableConfig.put("table-key", "table-value"); + MySqlSyncDatabaseAction action = syncDatabaseActionBuilder(getBasicMySqlConfig()) - .withCatalogConfig(Collections.singletonMap("catalog-key", "catalog-value")) - .withTableConfig(Collections.singletonMap("table-key", "table-value")) + .withTableConfig(getTableConfig(tableConfig)) + .withCatalogConfig(getCatalogOptions(catalogConfig)) .build(); assertThat(action.catalogConfig()).containsEntry("catalog-key", "catalog-value"); - assertThat(action.tableConfig()) - .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value")); + assertThat(action.tableConfig()).containsExactlyEntriesOf(getTableConfig(tableConfig)); } @Test @@ -1258,7 +1286,8 @@ public void testMetadataColumns() throws Exception { MultiTablesSinkMode mode = ThreadLocalRandom.current().nextBoolean() ? DIVIDED : COMBINED; MySqlSyncDatabaseAction action = syncDatabaseActionBuilder(mySqlConfig) - .withTableConfig(getBasicTableConfig()) + .withTableConfig(getTableConfig(getBasicTableConfig())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .withMode(mode.configString()) .withMetadataColumn(Arrays.asList("table_name", "database_name")) .build(); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionToBranchITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionToBranchITCase.java new file mode 100644 index 0000000000000..e6cf68455bb10 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionToBranchITCase.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.mysql; + +import org.junit.jupiter.api.BeforeEach; + +import java.io.IOException; + +/** IT cases for {@link MySqlSyncDatabaseAction}. */ +public class MySqlSyncDatabaseActionToBranchITCase extends MySqlSyncDatabaseActionITCase { + + @BeforeEach + @Override + public void before() throws IOException { + this.branch = "testMySqlSyncDatabaseActionBranch"; + super.before(); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index aee4c484f27dc..1f09f51f8f4c9 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -72,12 +72,13 @@ public void testSchemaEvolution() throws Exception { mySqlConfig.put("database-name", DATABASE_NAME); mySqlConfig.put("table-name", "schema_evolution_\\d+"); + Map catalogConfig = getBasicMySqlConfig(); + catalogConfig.put(CatalogOptions.METASTORE.key(), "test-alter-table"); + MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig) - .withCatalogConfig( - Collections.singletonMap( - CatalogOptions.METASTORE.key(), "test-alter-table")) - .withTableConfig(getBasicTableConfig()) + .withCatalogConfig(getCatalogOptions(catalogConfig)) + .withTableConfig(getTableConfig(getBasicTableConfig())) .withPartitionKeys("pt") .withPrimaryKeys("pt", "_id") .build(); @@ -251,7 +252,11 @@ public void testMultipleSchemaEvolutions() throws Exception { mySqlConfig.put("database-name", DATABASE_NAME); mySqlConfig.put("table-name", "schema_evolution_multiple"); - MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig).build(); + MySqlSyncTableAction action = + syncTableActionBuilder(mySqlConfig) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) + .withTableConfig(getTableConfig(new HashMap<>())) + .build(); runActionWithDefaultEnv(action); checkTableSchema( @@ -316,7 +321,7 @@ private void testSchemaEvolutionMultipleImpl(Statement statement) throws Excepti } @Test - @Timeout(90) + @Timeout(180) public void testAllTypes() throws Exception { // the first round checks for table creation // the second round checks for running the action on an existing table @@ -325,13 +330,15 @@ public void testAllTypes() throws Exception { } } - private void testAllTypesOnce() throws Exception { + protected void testAllTypesOnce() throws Exception { Map mySqlConfig = getBasicMySqlConfig(); mySqlConfig.put("database-name", DATABASE_NAME); mySqlConfig.put("table-name", "all_types_table"); MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) + .withTableConfig(getTableConfig(new HashMap<>())) .withPartitionKeys("pt") .withPrimaryKeys("pt", "_id") .build(); @@ -611,7 +618,7 @@ private void testAllTypesImpl(Statement statement) throws Exception { } finally { statement.executeUpdate("ALTER TABLE all_types_table DROP COLUMN v"); SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location()); - schemaManager.commitChanges(SchemaChange.dropColumn("v")); + schemaManager.commitChanges(branch, SchemaChange.dropColumn("v")); } } @@ -621,7 +628,11 @@ public void testIncompatibleMySqlTable() { mySqlConfig.put("database-name", DATABASE_NAME); mySqlConfig.put("table-name", "incompatible_field_\\d+"); - MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig).build(); + MySqlSyncTableAction action = + syncTableActionBuilder(mySqlConfig) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) + .withTableConfig(getTableConfig(new HashMap<>())) + .build(); assertThatThrownBy(action::run) .satisfies( @@ -647,7 +658,11 @@ public void testIncompatiblePaimonTable() throws Exception { new HashMap<>()); MySqlSyncTableAction action = - syncTableActionBuilder(mySqlConfig).withPrimaryKeys("a").build(); + syncTableActionBuilder(mySqlConfig) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) + .withTableConfig(getTableConfig(new HashMap<>())) + .withPrimaryKeys("a") + .build(); assertThatThrownBy(action::run) .satisfies( @@ -663,7 +678,11 @@ public void testInvalidPrimaryKey() { mySqlConfig.put("table-name", "schema_evolution_\\d+"); MySqlSyncTableAction action = - syncTableActionBuilder(mySqlConfig).withPrimaryKeys("pk").build(); + syncTableActionBuilder(mySqlConfig) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) + .withTableConfig(getTableConfig(new HashMap<>())) + .withPrimaryKeys("pk") + .build(); assertThatThrownBy(action::run) .satisfies( @@ -678,7 +697,11 @@ public void testNoPrimaryKey() { mySqlConfig.put("database-name", DATABASE_NAME); mySqlConfig.put("table-name", "incompatible_pk_\\d+"); - MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig).build(); + MySqlSyncTableAction action = + syncTableActionBuilder(mySqlConfig) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) + .withTableConfig(getTableConfig(new HashMap<>())) + .build(); assertThatThrownBy(action::run) .satisfies( @@ -690,7 +713,7 @@ public void testNoPrimaryKey() { } @Test - @Timeout(60) + @Timeout(240) public void testComputedColumn() throws Exception { // the first round checks for table creation // the second round checks for running the action on an existing table @@ -733,6 +756,8 @@ private void innerTestComputedColumn(boolean executeMysql) throws Exception { MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) + .withTableConfig(getTableConfig(new HashMap<>())) .withPartitionKeys("_year_date") .withPrimaryKeys("pk", "_year_date") .withComputedColumnArgs(computedColumnDefs) @@ -833,6 +858,8 @@ public void testSyncShards() throws Exception { MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) + .withTableConfig(getTableConfig(new HashMap<>())) .withPartitionKeys("pt") .withPrimaryKeys("pk", "pt") .withComputedColumnArgs("pt=substring(_date,5)") @@ -884,7 +911,8 @@ public void testOptionsChange() throws Exception { .withPartitionKeys("pt") .withPrimaryKeys("pk", "pt") .withComputedColumnArgs("pt=substring(_date,5)") - .withTableConfig(tableConfig) + .withTableConfig(getTableConfig(tableConfig)) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .build(); JobClient jobClient = runActionWithDefaultEnv(action1); try (Statement statement = getStatement()) { @@ -912,7 +940,8 @@ public void testOptionsChange() throws Exception { .withPartitionKeys("pt") .withPrimaryKeys("pk", "pt") .withComputedColumnArgs("pt=substring(_date,5)") - .withTableConfig(tableConfig) + .withTableConfig(getTableConfig(tableConfig)) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .build(); runActionWithDefaultEnv(action2); @@ -934,6 +963,8 @@ public void testMetadataColumns() throws Exception { MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig) + .withTableConfig(getTableConfig(new HashMap<>())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .withPrimaryKeys("pk") .withMetadataColumns("table_name", "database_name", "op_ts") .build(); @@ -971,15 +1002,20 @@ public void testMetadataColumns() throws Exception { @Test public void testCatalogAndTableConfig() { + Map catalogOptions = new HashMap<>(); + catalogOptions.put("catalog-key", "catalog-value"); + + Map tableConfig = new HashMap<>(); + tableConfig.put("table-key", "table-value"); + MySqlSyncTableAction action = syncTableActionBuilder(getBasicMySqlConfig()) - .withCatalogConfig(Collections.singletonMap("catalog-key", "catalog-value")) - .withTableConfig(Collections.singletonMap("table-key", "table-value")) + .withTableConfig(getTableConfig(tableConfig)) + .withCatalogConfig(getCatalogOptions(catalogOptions)) .build(); assertThat(action.catalogConfig()).containsEntry("catalog-key", "catalog-value"); - assertThat(action.tableConfig()) - .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value")); + assertThat(action.tableConfig()).containsExactlyEntriesOf(getTableConfig(tableConfig)); } private FileStoreTable getFileStoreTable() throws Exception { @@ -996,7 +1032,11 @@ public void testDefaultCheckpointInterval() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.noRestart()); - MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig).build(); + MySqlSyncTableAction action = + syncTableActionBuilder(mySqlConfig) + .withTableConfig(getTableConfig(new HashMap<>())) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) + .build(); action.withStreamExecutionEnvironment(env); Thread thread = @@ -1027,11 +1067,13 @@ public void testComputedColumnWithCaseInsensitive() throws Exception { mySqlConfig.put("database-name", "computed_column_with_case_insensitive"); mySqlConfig.put("table-name", "t"); + Map catalogConfig = new HashMap<>(); + catalogConfig.put(FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"); + MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig) - .withCatalogConfig( - Collections.singletonMap( - FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false")) + .withCatalogConfig(getCatalogOptions(catalogConfig)) + .withTableConfig(getTableConfig(new HashMap<>())) .withComputedColumnArgs("SUBSTRING=substring(UPPERCASE_STRING,2)") .build(); runActionWithDefaultEnv(action); @@ -1063,11 +1105,13 @@ public void testSpecifyKeysWithCaseInsensitive() throws Exception { mySqlConfig.put("database-name", "specify_key_with_case_insensitive"); mySqlConfig.put("table-name", "t"); + Map catalogConfig = new HashMap<>(); + catalogConfig.put(FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"); + MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig) - .withCatalogConfig( - Collections.singletonMap( - FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false")) + .withCatalogConfig(getCatalogOptions(catalogConfig)) + .withTableConfig(getTableConfig(new HashMap<>())) .withPrimaryKeys("ID1", "PART") .withPartitionKeys("PART") .build(); @@ -1091,9 +1135,13 @@ public void testInvalidAlterBucket() throws Exception { mySqlConfig.put("database-name", "invalid_alter_bucket"); mySqlConfig.put("table-name", "t"); + Map tableConfig = new HashMap<>(); + tableConfig.put(BUCKET.key(), "2"); + MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig) - .withTableConfig(Collections.singletonMap(BUCKET.key(), "2")) + .withTableConfig(getTableConfig(tableConfig)) + .withCatalogConfig(getCatalogOptions(new HashMap<>())) .build(); assertThatCode(action::build).doesNotThrowAnyException(); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionToBranchITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionToBranchITCase.java new file mode 100644 index 0000000000000..7567ec7ea6402 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionToBranchITCase.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.mysql; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.io.IOException; + +/** IT cases for {@link MySqlSyncTableAction}. */ +public class MySqlSyncTableActionToBranchITCase extends MySqlSyncTableActionITCase { + @BeforeEach + public void before() throws IOException { + super.branch = "testMySqlSyncTableActionBranch"; + super.before(); + } + + @Test + @Timeout(120) + public void testAllTypes() throws Exception { + // Waiting to continue verification + testAllTypesOnce(); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_database_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_database_setup.sql index bd70146be48cc..82b8170f23545 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_database_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_database_setup.sql @@ -369,6 +369,56 @@ CREATE TABLE t3 ( v1 VARCHAR(10) ); + +-- ################################################################################ +-- testSyncMultipleShardsWithBranch +-- ################################################################################ + +CREATE DATABASE database_branch_shard_1; +USE database_branch_shard_1; + +CREATE TABLE t1 ( + k INT, + v1 VARCHAR(10), + PRIMARY KEY (k) +); + +CREATE TABLE t2 ( + k BIGINT, + v1 DOUBLE, + PRIMARY KEY (k) +); + +CREATE TABLE t3 ( + k INT, + v1 VARCHAR(10), + PRIMARY KEY (k) +); + +CREATE DATABASE database_branch_shard_2; +USE database_branch_shard_2; + +-- test schema merging +CREATE TABLE t1 ( + k INT, + v1 VARCHAR(20), + v2 BIGINT, + PRIMARY KEY (k) +); + +-- test schema evolution +CREATE TABLE t2 ( + k BIGINT, + v1 DOUBLE, + PRIMARY KEY (k) +); + +-- test some shard doesn't have primary key +CREATE TABLE t3 ( + k INT, + v1 VARCHAR(10) +); + -- ################################################################################ -- testSyncMultipleShardsWithoutMerging -- ################################################################################ diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java index a2d3f16ba0215..54ae4eee5520b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java @@ -25,6 +25,7 @@ import org.apache.paimon.manifest.WrappedManifestCommittable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.utils.BranchManager; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.metrics.groups.OperatorMetricGroup; @@ -60,11 +61,21 @@ public class StoreMultiCommitter // compact job needs set "write-only" of a table to false private final boolean isCompactJob; + private String branch; + public StoreMultiCommitter( Catalog.Loader catalogLoader, String commitUser, @Nullable OperatorMetricGroup flinkMetricGroup) { - this(catalogLoader, commitUser, flinkMetricGroup, false); + this(catalogLoader, commitUser, flinkMetricGroup, BranchManager.DEFAULT_MAIN_BRANCH); + } + + public StoreMultiCommitter( + Catalog.Loader catalogLoader, + String commitUser, + @Nullable OperatorMetricGroup flinkMetricGroup, + String branch) { + this(catalogLoader, commitUser, flinkMetricGroup, false, branch); } public StoreMultiCommitter( @@ -72,11 +83,26 @@ public StoreMultiCommitter( String commitUser, @Nullable OperatorMetricGroup flinkMetricGroup, boolean isCompactJob) { + this( + catalogLoader, + commitUser, + flinkMetricGroup, + isCompactJob, + BranchManager.DEFAULT_MAIN_BRANCH); + } + + public StoreMultiCommitter( + Catalog.Loader catalogLoader, + String commitUser, + @Nullable OperatorMetricGroup flinkMetricGroup, + boolean isCompactJob, + String branch) { this.catalog = catalogLoader.load(); this.commitUser = commitUser; this.flinkMetricGroup = flinkMetricGroup; this.tableCommitters = new HashMap<>(); this.isCompactJob = isCompactJob; + this.branch = branch; } @Override @@ -206,7 +232,7 @@ private StoreCommitter getStoreCommitter(Identifier tableId) { } committer = new StoreCommitter( - table.newCommit(commitUser).ignoreEmptyCommit(isCompactJob), + table.newCommit(commitUser, branch).ignoreEmptyCommit(isCompactJob), flinkMetricGroup); tableCommitters.put(tableId, committer); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java index b7c739e96e68d..32b4bd30fe9c7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java @@ -33,6 +33,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.utils.BlockingIterator; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.FailingFileIO; import org.apache.flink.api.common.RuntimeExecutionMode; @@ -60,6 +61,7 @@ import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -122,11 +124,18 @@ public class FileStoreITCase extends AbstractTestBase { private final StreamExecutionEnvironment env; + protected static String branch; + public FileStoreITCase(boolean isBatch) { this.isBatch = isBatch; this.env = isBatch ? buildBatchEnv() : buildStreamEnv(); } + @BeforeAll + public static void before() { + branch = BranchManager.DEFAULT_MAIN_BRANCH; + } + @Parameters(name = "isBatch-{0}") public static List getVarSeg() { return Arrays.asList(true, false); @@ -141,12 +150,19 @@ public void testPartitioned() throws Exception { FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] {1, 2}); // write - new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build(); + new FlinkSinkBuilder(table) + .withInput(buildTestSource(env, isBatch)) + .toBranch(branch) + .build(); env.execute(); // read List results = - executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build()); + executeAndCollect( + new FlinkSourceBuilder(IDENTIFIER, table) + .withEnv(env) + .fromBranch(branch) + .build()); // assert Row[] expected = @@ -161,12 +177,19 @@ public void testNonPartitioned() throws Exception { FileStoreTable table = buildFileStoreTable(new int[0], new int[] {2}); // write - new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build(); + new FlinkSinkBuilder(table) + .withInput(buildTestSource(env, isBatch)) + .toBranch(branch) + .build(); env.execute(); // read List results = - executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build()); + executeAndCollect( + new FlinkSourceBuilder(IDENTIFIER, table) + .withEnv(env) + .fromBranch(branch) + .build()); // assert Row[] expected = new Row[] {Row.of(5, "p2", 1), Row.of(0, "p1", 2), Row.of(3, "p2", 5)}; @@ -180,7 +203,10 @@ public void testOverwrite() throws Exception { FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] {1, 2}); // write - new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build(); + new FlinkSinkBuilder(table) + .withInput(buildTestSource(env, isBatch)) + .toBranch(branch) + .build(); env.execute(); // overwrite p2 @@ -194,12 +220,17 @@ public void testOverwrite() throws Exception { new FlinkSinkBuilder(table) .withInput(partialData) .withOverwritePartition(overwrite) + .toBranch(branch) .build(); env.execute(); // read List results = - executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build()); + executeAndCollect( + new FlinkSourceBuilder(IDENTIFIER, table) + .withEnv(env) + .fromBranch(branch) + .build()); Row[] expected = new Row[] {Row.of(9, "p2", 5), Row.of(5, "p1", 1), Row.of(0, "p1", 2)}; assertThat(results).containsExactlyInAnyOrder(expected); @@ -213,11 +244,17 @@ public void testOverwrite() throws Exception { new FlinkSinkBuilder(table) .withInput(partialData) .withOverwritePartition(new HashMap<>()) + .toBranch(branch) .build(); env.execute(); // read - results = executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build()); + results = + executeAndCollect( + new FlinkSourceBuilder(IDENTIFIER, table) + .withEnv(env) + .fromBranch(branch) + .build()); expected = new Row[] {Row.of(19, "p2", 6), Row.of(5, "p1", 1), Row.of(0, "p1", 2)}; assertThat(results).containsExactlyInAnyOrder(expected); @@ -233,11 +270,17 @@ public void testOverwrite() throws Exception { CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false"))) .withInput(partialData) .withOverwritePartition(new HashMap<>()) + .toBranch(branch) .build(); env.execute(); // read - results = executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build()); + results = + executeAndCollect( + new FlinkSourceBuilder(IDENTIFIER, table) + .withEnv(env) + .fromBranch(branch) + .build()); expected = new Row[] {Row.of(20, "p2", 3)}; assertThat(results).containsExactlyInAnyOrder(expected); } @@ -247,12 +290,19 @@ public void testPartitionedNonKey() throws Exception { FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[0]); // write - new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build(); + new FlinkSinkBuilder(table) + .withInput(buildTestSource(env, isBatch)) + .toBranch(branch) + .build(); env.execute(); // read List results = - executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build()); + executeAndCollect( + new FlinkSourceBuilder(IDENTIFIER, table) + .withEnv(env) + .fromBranch(branch) + .build()); // assert // in streaming mode, expect origin data X 2 (FiniteTestSource) @@ -276,7 +326,10 @@ public void testNonKeyedProjection() throws Exception { private void testProjection(FileStoreTable table) throws Exception { // write - new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build(); + new FlinkSinkBuilder(table) + .withInput(buildTestSource(env, isBatch)) + .toBranch(branch) + .build(); env.execute(); // read @@ -292,6 +345,7 @@ private void testProjection(FileStoreTable table) throws Exception { new FlinkSourceBuilder(IDENTIFIER, table) .withProjection(projection.toNestedIndexes()) .withEnv(env) + .fromBranch(branch) .build(), converter); @@ -332,6 +386,7 @@ public void testContinuousBounded() throws Exception { new FlinkSourceBuilder(IDENTIFIER, table) .withContinuousMode(true) .withEnv(env) + .fromBranch(branch) .build(); Transformation transformation = source.getTransformation(); assertThat(transformation).isInstanceOf(SourceTransformation.class); @@ -347,6 +402,7 @@ private void innerTestContinuous(FileStoreTable table) throws Exception { new FlinkSourceBuilder(IDENTIFIER, table) .withContinuousMode(true) .withEnv(env) + .fromBranch(branch) .build() .executeAndCollect(), CONVERTER::toExternal); @@ -383,7 +439,7 @@ private void sinkAndValidate( } DataStreamSource source = env.addSource(new FiniteTestSource<>(src, true), InternalTypeInfo.of(TABLE_TYPE)); - new FlinkSinkBuilder(table).withInput(source).build(); + new FlinkSinkBuilder(table).withInput(source).toBranch(branch).build(); env.execute(); assertThat(iterator.collect(expected.length)).containsExactlyInAnyOrder(expected); } @@ -429,8 +485,8 @@ public static FileStoreTable buildFileStoreTable( ""); return retryArtificialException( () -> { - new SchemaManager(LocalFileIO.create(), tablePath).createTable(schema); - return FileStoreTableFactory.create(LocalFileIO.create(), options); + new SchemaManager(LocalFileIO.create(), tablePath).createTable(schema, branch); + return FileStoreTableFactory.create(LocalFileIO.create(), options, branch); }); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreWithBranchITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreWithBranchITCase.java new file mode 100644 index 0000000000000..8a2374b3c367b --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreWithBranchITCase.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.flink.sink.FixedBucketSink; +import org.apache.paimon.flink.source.ContinuousFileStoreSource; +import org.apache.paimon.flink.source.StaticFileStoreSource; + +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * ITCase for {@link StaticFileStoreSource}, {@link ContinuousFileStoreSource} and {@link + * FixedBucketSink}. + */ +@ExtendWith(ParameterizedTestExtension.class) +public class FileStoreWithBranchITCase extends FileStoreITCase { + public FileStoreWithBranchITCase(boolean isBatch) { + super(isBatch); + } + + @BeforeAll + public static void before() { + branch = "testBranch"; + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java index 80cf2cc767119..54827158f96b2 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java @@ -27,6 +27,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.FileStoreTable; @@ -35,6 +36,7 @@ import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.restartstrategy.RestartStrategies; @@ -57,6 +59,9 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; +import static org.apache.paimon.options.CatalogOptions.BRANCH; +import static org.apache.paimon.options.CatalogOptions.WAREHOUSE; + /** {@link Action} test base. */ public abstract class ActionITCaseBase extends AbstractTestBase { @@ -67,7 +72,8 @@ public abstract class ActionITCaseBase extends AbstractTestBase { protected StreamTableWrite write; protected StreamTableCommit commit; protected Catalog catalog; - private long incrementalIdentifier; + protected long incrementalIdentifier; + protected String branch = BranchManager.DEFAULT_MAIN_BRANCH; @BeforeEach public void before() throws IOException { @@ -76,7 +82,12 @@ public void before() throws IOException { tableName = "test_table_" + UUID.randomUUID(); commitUser = UUID.randomUUID().toString(); incrementalIdentifier = 0; - catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse))); + Map options = new HashMap<>(); + options.put(WAREHOUSE.key(), new Path(warehouse).toUri().toString()); + if (!branch.equals(BranchManager.DEFAULT_MAIN_BRANCH)) { + options.put(BRANCH.key(), branch); + } + catalog = CatalogFactory.createCatalog(CatalogContext.create(Options.fromMap(options))); } @AfterEach diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderFromBranchTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderFromBranchTest.java new file mode 100644 index 0000000000000..076813587e59e --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderFromBranchTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +import org.junit.jupiter.api.BeforeEach; + +import java.util.UUID; + +/** Unit tests for the {@link FileStoreSourceReader}. */ +public class FileStoreSourceReaderFromBranchTest extends FileStoreSourceReaderTest { + + @BeforeEach + public void beforeEach() throws Exception { + branch = "testBranch-" + UUID.randomUUID(); + super.beforeEach(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java index 882763cf74da2..64d3da75626d4 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java @@ -28,6 +28,7 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; @@ -57,6 +58,8 @@ public class FileStoreSourceReaderTest { @TempDir protected java.nio.file.Path tempDir; + protected String branch = BranchManager.DEFAULT_MAIN_BRANCH; + @BeforeEach public void beforeEach() throws Exception { SchemaManager schemaManager = @@ -72,7 +75,8 @@ public void beforeEach() throws Exception { Collections.singletonList("default"), Arrays.asList("k", "default"), Collections.emptyMap(), - null)); + null), + branch); } @Test @@ -138,7 +142,7 @@ public void testReaderOnSplitFinished() throws Exception { protected FileStoreSourceReader createReader(TestingReaderContext context) { return new FileStoreSourceReader( context, - new TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(), + new TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(branch), new FileStoreSourceReaderMetrics(new DummyMetricGroup()), IOManager.create(tempDir.toString()), null); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderWithBranchTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderWithBranchTest.java deleted file mode 100644 index 07e4305e3a652..0000000000000 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderWithBranchTest.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.source; - -import org.apache.paimon.disk.IOManager; -import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; -import org.apache.paimon.fs.Path; -import org.apache.paimon.fs.local.LocalFileIO; -import org.apache.paimon.schema.Schema; -import org.apache.paimon.schema.SchemaManager; -import org.apache.paimon.types.BigIntType; -import org.apache.paimon.types.DataField; -import org.apache.paimon.types.IntType; -import org.apache.paimon.types.RowType; - -import org.apache.flink.api.connector.source.SourceEvent; -import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; -import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput; -import org.apache.flink.metrics.CharacterFilter; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.Histogram; -import org.apache.flink.metrics.Meter; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.table.data.RowData; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import static org.apache.paimon.flink.source.FileStoreSourceSplitSerializerTest.newSourceSplit; -import static org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row; -import static org.assertj.core.api.Assertions.assertThat; - -/** Unit tests for the {@link FileStoreSourceReader}. */ -public class FileStoreSourceReaderWithBranchTest { - - @TempDir protected java.nio.file.Path tempDir; - - protected String branch = UUID.randomUUID() + "-testBranch"; - - @BeforeEach - public void beforeEach() throws Exception { - SchemaManager schemaManager = - new SchemaManager(LocalFileIO.create(), new Path(tempDir.toUri())); - schemaManager.createTable( - new Schema( - new RowType( - Arrays.asList( - new DataField(0, "k", new BigIntType()), - new DataField(1, "v", new BigIntType()), - new DataField(2, "default", new IntType()))) - .getFields(), - Collections.singletonList("default"), - Arrays.asList("k", "default"), - Collections.emptyMap(), - null), - branch); - } - - @Test - public void testRequestSplitWhenNoSplitRestored() throws Exception { - final TestingReaderContext context = new TestingReaderContext(); - final FileStoreSourceReader reader = createReader(context); - - reader.start(); - reader.close(); - - assertThat(context.getNumSplitRequests()).isEqualTo(1); - } - - @Test - public void testNoSplitRequestWhenSplitRestored() throws Exception { - final TestingReaderContext context = new TestingReaderContext(); - final FileStoreSourceReader reader = createReader(context); - - reader.addSplits(Collections.singletonList(createTestFileSplit("id1"))); - reader.start(); - reader.close(); - - assertThat(context.getNumSplitRequests()).isEqualTo(0); - } - - @Test - public void testAddMultipleSplits() throws Exception { - final TestingReaderContext context = new TestingReaderContext(); - final FileStoreSourceReader reader = createReader(context); - - reader.start(); - assertThat(context.getNumSplitRequests()).isEqualTo(1); - - reader.addSplits(Arrays.asList(createTestFileSplit("id1"), createTestFileSplit("id2"))); - TestingReaderOutput output = new TestingReaderOutput<>(); - while (reader.getNumberOfCurrentlyAssignedSplits() > 0) { - reader.pollNext(output); - Thread.sleep(10); - } - assertThat(context.getNumSplitRequests()).isEqualTo(2); - } - - @Test - public void testReaderOnSplitFinished() throws Exception { - final TestingReaderContext context = new TestingReaderContext(); - final FileStoreSourceReader reader = createReader(context); - - reader.start(); - reader.addSplits(Collections.singletonList(createTestFileSplit("id1"))); - TestingReaderOutput output = new TestingReaderOutput<>(); - while (reader.getNumberOfCurrentlyAssignedSplits() > 0) { - reader.pollNext(output); - Thread.sleep(10); - } - - List sourceEvents = context.getSentEvents(); - assertThat(sourceEvents.size()).isEqualTo(1); - assertThat(sourceEvents.get(0)).isExactlyInstanceOf(ReaderConsumeProgressEvent.class); - assertThat(((ReaderConsumeProgressEvent) sourceEvents.get(0))) - .matches(event -> event.lastConsumeSnapshotId() == 1L); - } - - protected FileStoreSourceReader createReader(TestingReaderContext context) { - return new FileStoreSourceReader( - context, - new TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(branch), - new FileStoreSourceReaderMetrics(new DummyMetricGroup()), - IOManager.create(tempDir.toString()), - null); - } - - protected static FileStoreSourceSplit createTestFileSplit(String id) { - return newSourceSplit(id, row(1), 0, Collections.emptyList()); - } - - /** A {@link MetricGroup} for testing. */ - public static class DummyMetricGroup implements MetricGroup { - public DummyMetricGroup() {} - - public Counter counter(String name) { - return null; - } - - public C counter(String name, C counter) { - return null; - } - - public > G gauge(String name, G gauge) { - return null; - } - - public H histogram(String name, H histogram) { - return null; - } - - public M meter(String name, M meter) { - return null; - } - - public MetricGroup addGroup(String name) { - return null; - } - - public MetricGroup addGroup(String key, String value) { - return null; - } - - public String[] getScopeComponents() { - return new String[0]; - } - - public Map getAllVariables() { - return null; - } - - public String getMetricIdentifier(String metricName) { - return null; - } - - public String getMetricIdentifier(String metricName, CharacterFilter filter) { - return null; - } - } -} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderFromBranchTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderFromBranchTest.java new file mode 100644 index 0000000000000..92ac692df101f --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderFromBranchTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +import org.junit.jupiter.api.BeforeEach; + +import java.util.UUID; + +/** Test for {@link FileStoreSourceSplitReader}. */ +public class FileStoreSourceSplitReaderFromBranchTest extends FileStoreSourceSplitReaderTest { + + @BeforeEach + public void beforeEach() throws Exception { + branch = "testBranch-" + UUID.randomUUID(); + super.beforeEach(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java index 67b3c3e603f84..8c7b9b586fca1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java @@ -28,13 +28,13 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.RecordWriter; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; -import org.apache.flink.connector.file.src.reader.BulkFormat; import org.apache.flink.connector.file.src.reader.BulkFormat.RecordIterator; import org.apache.flink.connector.file.src.util.RecordAndPosition; import org.apache.flink.table.data.RowData; @@ -67,6 +67,8 @@ public class FileStoreSourceSplitReaderTest { @TempDir java.nio.file.Path tempDir; + protected String branch = BranchManager.DEFAULT_MAIN_BRANCH; + @BeforeEach public void beforeEach() throws Exception { SchemaManager schemaManager = @@ -84,7 +86,8 @@ public void beforeEach() throws Exception { Collections.singletonList("default"), Arrays.asList("k", "default"), Collections.emptyMap(), - null)); + null), + branch); } @Test @@ -106,10 +109,10 @@ private FileStoreSourceSplitReader createReader(TableRead tableRead, @Nullable L private void innerTestOnce(int skip) throws Exception { TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString()); - FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(), null); + FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null); List> input = kvs(); - List files = rw.writeFiles(row(1), 0, input); + List files = rw.writeFiles(row(1), 0, input, branch); assignSplit(reader, newSourceSplit("id1", row(1), 0, files, skip)); @@ -132,10 +135,10 @@ private void innerTestOnce(int skip) throws Exception { @Test public void testPrimaryKeyWithDelete() throws Exception { TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString()); - FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(), null); + FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null); List> input = kvs(); - RecordWriter writer = rw.createMergeTreeWriter(row(1), 0); + RecordWriter writer = rw.createMergeTreeWriter(row(1), 0, branch); for (Tuple2 tuple2 : input) { writer.write( new KeyValue() @@ -154,7 +157,7 @@ public void testPrimaryKeyWithDelete() throws Exception { writer.close(); assignSplit(reader, newSourceSplit("id1", row(1), 0, files, true)); - RecordsWithSplitIds> records = reader.fetch(); + RecordsWithSplitIds> records = reader.fetch(); List> expected = input.stream() @@ -174,18 +177,18 @@ public void testPrimaryKeyWithDelete() throws Exception { @Test public void testMultipleBatchInSplit() throws Exception { TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString()); - FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(), null); + FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null); List> input1 = kvs(); - List files = rw.writeFiles(row(1), 0, input1); + List files = rw.writeFiles(row(1), 0, input1, branch); List> input2 = kvs(6); - List files2 = rw.writeFiles(row(1), 0, input2); + List files2 = rw.writeFiles(row(1), 0, input2, branch); files.addAll(files2); assignSplit(reader, newSourceSplit("id1", row(1), 0, files)); - RecordsWithSplitIds> records = reader.fetch(); + RecordsWithSplitIds> records = reader.fetch(); assertRecords( records, null, @@ -210,14 +213,14 @@ public void testMultipleBatchInSplit() throws Exception { @Test public void testRestore() throws Exception { TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString()); - FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(), null); + FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null); List> input = kvs(); - List files = rw.writeFiles(row(1), 0, input); + List files = rw.writeFiles(row(1), 0, input, branch); assignSplit(reader, newSourceSplit("id1", row(1), 0, files, 3)); - RecordsWithSplitIds> records = reader.fetch(); + RecordsWithSplitIds> records = reader.fetch(); assertRecords( records, null, @@ -236,18 +239,18 @@ public void testRestore() throws Exception { @Test public void testRestoreMultipleBatchInSplit() throws Exception { TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString()); - FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(), null); + FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null); List> input1 = kvs(); - List files = rw.writeFiles(row(1), 0, input1); + List files = rw.writeFiles(row(1), 0, input1, branch); List> input2 = kvs(6); - List files2 = rw.writeFiles(row(1), 0, input2); + List files2 = rw.writeFiles(row(1), 0, input2, branch); files.addAll(files2); assignSplit(reader, newSourceSplit("id1", row(1), 0, files, 7)); - RecordsWithSplitIds> records = reader.fetch(); + RecordsWithSplitIds> records = reader.fetch(); assertRecords( records, null, @@ -267,17 +270,17 @@ public void testRestoreMultipleBatchInSplit() throws Exception { @Test public void testMultipleSplits() throws Exception { TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString()); - FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(), null); + FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null); List> input1 = kvs(); - List files1 = rw.writeFiles(row(1), 0, input1); + List files1 = rw.writeFiles(row(1), 0, input1, branch); assignSplit(reader, newSourceSplit("id1", row(1), 0, files1)); List> input2 = kvs(); - List files2 = rw.writeFiles(row(2), 1, input2); + List files2 = rw.writeFiles(row(2), 1, input2, branch); assignSplit(reader, newSourceSplit("id2", row(2), 1, files2)); - RecordsWithSplitIds> records = reader.fetch(); + RecordsWithSplitIds> records = reader.fetch(); assertRecords( records, null, @@ -305,7 +308,7 @@ public void testMultipleSplits() throws Exception { @Test public void testNoSplit() throws Exception { TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString()); - FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(), null); + FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null); assertThatThrownBy(reader::fetch).hasMessageContaining("no split remaining"); reader.close(); } @@ -313,14 +316,14 @@ public void testNoSplit() throws Exception { @Test public void testLimit() throws Exception { TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString()); - FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(), 2L); + FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), 2L); List> input = kvs(); - List files = rw.writeFiles(row(1), 0, input); + List files = rw.writeFiles(row(1), 0, input, branch); assignSplit(reader, newSourceSplit("id1", row(1), 0, files, 0)); - RecordsWithSplitIds> records = reader.fetch(); + RecordsWithSplitIds> records = reader.fetch(); List> expected = input.stream() @@ -346,19 +349,19 @@ public void testLimit() throws Exception { @Test public void testPauseOrResumeSplits() throws Exception { TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString()); - FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(), null); + FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null); List> input1 = kvs(); - List files = rw.writeFiles(row(1), 0, input1); + List files = rw.writeFiles(row(1), 0, input1, branch); List> input2 = kvs(6); - List files2 = rw.writeFiles(row(1), 0, input2); + List files2 = rw.writeFiles(row(1), 0, input2, branch); files.addAll(files2); FileStoreSourceSplit split1 = newSourceSplit("id1", row(1), 0, files); assignSplit(reader, split1); - RecordsWithSplitIds> records = reader.fetch(); + RecordsWithSplitIds> records = reader.fetch(); assertRecords( records, null, @@ -373,7 +376,7 @@ public void testPauseOrResumeSplits() throws Exception { // assign next split List> input3 = kvs(12); - List files3 = rw.writeFiles(row(1), 0, input3); + List files3 = rw.writeFiles(row(1), 0, input3, branch); FileStoreSourceSplit split2 = newSourceSplit("id2", row(1), 0, files3); assignSplit(reader, split2); @@ -408,7 +411,7 @@ public void testPauseOrResumeSplits() throws Exception { } private void assertRecords( - RecordsWithSplitIds> records, + RecordsWithSplitIds> records, String finishedSplit, String nextSplit, long startRecordSkipCount, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderWithBranchTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderWithBranchTest.java deleted file mode 100644 index 4fbd6faa2ab09..0000000000000 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderWithBranchTest.java +++ /dev/null @@ -1,469 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.source; - -import org.apache.paimon.KeyValue; -import org.apache.paimon.data.GenericRow; -import org.apache.paimon.flink.source.FileStoreSourceReaderTest.DummyMetricGroup; -import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; -import org.apache.paimon.fs.Path; -import org.apache.paimon.fs.local.LocalFileIO; -import org.apache.paimon.io.DataFileMeta; -import org.apache.paimon.schema.Schema; -import org.apache.paimon.schema.SchemaManager; -import org.apache.paimon.table.source.TableRead; -import org.apache.paimon.utils.RecordWriter; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; -import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; -import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; -import org.apache.flink.connector.file.src.reader.BulkFormat.RecordIterator; -import org.apache.flink.connector.file.src.util.RecordAndPosition; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.IntType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.types.RowKind; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -import javax.annotation.Nullable; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.UUID; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.apache.paimon.flink.LogicalTypeConversion.toDataType; -import static org.apache.paimon.flink.source.FileStoreSourceSplitSerializerTest.newFile; -import static org.apache.paimon.flink.source.FileStoreSourceSplitSerializerTest.newSourceSplit; -import static org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -/** Test for {@link FileStoreSourceSplitReader}. */ -public class FileStoreSourceSplitReaderWithBranchTest { - - @TempDir java.nio.file.Path tempDir; - - private String branch = UUID.randomUUID() + "-testBranch"; - - @BeforeEach - public void beforeEach() throws Exception { - SchemaManager schemaManager = - new SchemaManager(LocalFileIO.create(), new Path(tempDir.toUri())); - schemaManager.createTable( - new Schema( - toDataType( - new RowType( - Arrays.asList( - new RowType.RowField("k", new BigIntType()), - new RowType.RowField("v", new BigIntType()), - new RowType.RowField( - "default", new IntType())))) - .getFields(), - Collections.singletonList("default"), - Arrays.asList("k", "default"), - Collections.emptyMap(), - null), - branch); - } - - @Test - public void testPrimaryKey() throws Exception { - innerTestOnce(0); - } - - @Test - public void testPrimaryKeySkip() throws Exception { - innerTestOnce(4); - } - - private FileStoreSourceSplitReader createReader(TableRead tableRead, @Nullable Long limit) { - return new FileStoreSourceSplitReader( - tableRead, - limit == null ? null : new RecordLimiter(limit), - new FileStoreSourceReaderMetrics(new DummyMetricGroup())); - } - - private void innerTestOnce(int skip) throws Exception { - TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString()); - FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null); - - List> input = kvs(); - List files = rw.writeFiles(row(1), 0, input, branch); - - assignSplit(reader, newSourceSplit("id1", row(1), 0, files, skip)); - - RecordsWithSplitIds> records = reader.fetch(); - - List> expected = - input.stream() - .map(t -> new Tuple2<>(RowKind.INSERT, t.f1)) - .collect(Collectors.toList()); - - List> result = readRecords(records, "id1", skip); - assertThat(result).isEqualTo(expected.subList(skip, expected.size())); - - records = reader.fetch(); - assertRecords(records, "id1", "id1", 0, null); - - reader.close(); - } - - @Test - public void testPrimaryKeyWithDelete() throws Exception { - TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString()); - FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null); - - List> input = kvs(); - RecordWriter writer = rw.createMergeTreeWriter(row(1), 0, branch); - for (Tuple2 tuple2 : input) { - writer.write( - new KeyValue() - .replace( - GenericRow.of(tuple2.f0), - org.apache.paimon.types.RowKind.INSERT, - GenericRow.of(tuple2.f1))); - } - writer.write( - new KeyValue() - .replace( - GenericRow.of(222L), - org.apache.paimon.types.RowKind.DELETE, - GenericRow.of(333L))); - List files = writer.prepareCommit(true).newFilesIncrement().newFiles(); - writer.close(); - - assignSplit(reader, newSourceSplit("id1", row(1), 0, files, true)); - RecordsWithSplitIds> records = reader.fetch(); - - List> expected = - input.stream() - .map(t -> new Tuple2<>(RowKind.INSERT, t.f1)) - .collect(Collectors.toList()); - expected.add(new Tuple2<>(RowKind.DELETE, 333L)); - - List> result = readRecords(records, "id1", 0); - assertThat(result).isEqualTo(expected); - - records = reader.fetch(); - assertRecords(records, "id1", "id1", 0, null); - - reader.close(); - } - - @Test - public void testMultipleBatchInSplit() throws Exception { - TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString()); - FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null); - - List> input1 = kvs(); - List files = rw.writeFiles(row(1), 0, input1, branch); - - List> input2 = kvs(6); - List files2 = rw.writeFiles(row(1), 0, input2, branch); - files.addAll(files2); - - assignSplit(reader, newSourceSplit("id1", row(1), 0, files)); - - RecordsWithSplitIds> records = reader.fetch(); - assertRecords( - records, - null, - "id1", - 0, - input1.stream().map(t -> t.f1).collect(Collectors.toList())); - - records = reader.fetch(); - assertRecords( - records, - null, - "id1", - 6, - input2.stream().map(t -> t.f1).collect(Collectors.toList())); - - records = reader.fetch(); - assertRecords(records, "id1", "id1", 0, null); - - reader.close(); - } - - @Test - public void testRestore() throws Exception { - TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString()); - FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null); - - List> input = kvs(); - List files = rw.writeFiles(row(1), 0, input, branch); - - assignSplit(reader, newSourceSplit("id1", row(1), 0, files, 3)); - - RecordsWithSplitIds> records = reader.fetch(); - assertRecords( - records, - null, - "id1", - 3, - input.subList(3, input.size()).stream() - .map(t -> t.f1) - .collect(Collectors.toList())); - - records = reader.fetch(); - assertRecords(records, "id1", "id1", 0, null); - - reader.close(); - } - - @Test - public void testRestoreMultipleBatchInSplit() throws Exception { - TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString()); - FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null); - - List> input1 = kvs(); - List files = rw.writeFiles(row(1), 0, input1, branch); - - List> input2 = kvs(6); - List files2 = rw.writeFiles(row(1), 0, input2, branch); - files.addAll(files2); - - assignSplit(reader, newSourceSplit("id1", row(1), 0, files, 7)); - - RecordsWithSplitIds> records = reader.fetch(); - assertRecords( - records, - null, - "id1", - 7, - Stream.concat(input1.stream(), input2.stream()) - .skip(7) - .map(t -> t.f1) - .collect(Collectors.toList())); - - records = reader.fetch(); - assertRecords(records, "id1", "id1", 0, null); - - reader.close(); - } - - @Test - public void testMultipleSplits() throws Exception { - TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString()); - FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null); - - List> input1 = kvs(); - List files1 = rw.writeFiles(row(1), 0, input1, branch); - assignSplit(reader, newSourceSplit("id1", row(1), 0, files1)); - - List> input2 = kvs(); - List files2 = rw.writeFiles(row(2), 1, input2, branch); - assignSplit(reader, newSourceSplit("id2", row(2), 1, files2)); - - RecordsWithSplitIds> records = reader.fetch(); - assertRecords( - records, - null, - "id1", - 0, - input1.stream().map(t -> t.f1).collect(Collectors.toList())); - - records = reader.fetch(); - assertRecords(records, "id1", "id1", 0, null); - - records = reader.fetch(); - assertRecords( - records, - null, - "id2", - 0, - input2.stream().map(t -> t.f1).collect(Collectors.toList())); - - records = reader.fetch(); - assertRecords(records, "id2", "id2", 0, null); - - reader.close(); - } - - @Test - public void testNoSplit() throws Exception { - TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString()); - FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null); - assertThatThrownBy(reader::fetch).hasMessageContaining("no split remaining"); - reader.close(); - } - - @Test - public void testLimit() throws Exception { - TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString()); - FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), 2L); - - List> input = kvs(); - List files = rw.writeFiles(row(1), 0, input, branch); - - assignSplit(reader, newSourceSplit("id1", row(1), 0, files, 0)); - - RecordsWithSplitIds> records = reader.fetch(); - - List> expected = - input.stream() - .map(t -> new Tuple2<>(RowKind.INSERT, t.f1)) - .collect(Collectors.toList()); - - List> result = readRecords(records, "id1", 0); - assertThat(result).isEqualTo(expected.subList(0, 2)); - - records = reader.fetch(); - assertRecords(records, "id1", null, 0, Collections.emptyList()); - - // test limit without opening reader - // create a new fake new file, throw exception if open it - assignSplit( - reader, newSourceSplit("id2", row(1), 0, Collections.singletonList(newFile(0)), 0)); - records = reader.fetch(); - assertRecords(records, "id2", null, 0, null); - - reader.close(); - } - - @Test - public void testPauseOrResumeSplits() throws Exception { - TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString()); - FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null); - - List> input1 = kvs(); - List files = rw.writeFiles(row(1), 0, input1, branch); - - List> input2 = kvs(6); - List files2 = rw.writeFiles(row(1), 0, input2, branch); - files.addAll(files2); - - FileStoreSourceSplit split1 = newSourceSplit("id1", row(1), 0, files); - assignSplit(reader, split1); - - RecordsWithSplitIds> records = reader.fetch(); - assertRecords( - records, - null, - "id1", - 0, - input1.stream().map(t -> t.f1).collect(Collectors.toList())); - - // pause split1 - reader.pauseOrResumeSplits(Collections.singletonList(split1), Collections.emptyList()); - records = reader.fetch(); - assertRecords(records, null, null, 0, Collections.emptyList()); - - // assign next split - List> input3 = kvs(12); - List files3 = rw.writeFiles(row(1), 0, input3, branch); - FileStoreSourceSplit split2 = newSourceSplit("id2", row(1), 0, files3); - assignSplit(reader, split2); - - records = reader.fetch(); - assertRecords(records, null, null, 0, Collections.emptyList()); - - // resume split1 - reader.pauseOrResumeSplits(Collections.emptyList(), Collections.singletonList(split1)); - records = reader.fetch(); - assertRecords( - records, - null, - "id1", - 6, - input2.stream().map(t -> t.f1).collect(Collectors.toList())); - - records = reader.fetch(); - assertRecords(records, "id1", "id1", 0, null); - - // fetch split2 - records = reader.fetch(); - assertRecords( - records, - null, - "id2", - 0, - input3.stream().map(t -> t.f1).collect(Collectors.toList())); - records = reader.fetch(); - assertRecords(records, "id2", "id2", 0, null); - - reader.close(); - } - - private void assertRecords( - RecordsWithSplitIds> records, - String finishedSplit, - String nextSplit, - long startRecordSkipCount, - List expected) { - if (finishedSplit != null) { - assertThat(records.finishedSplits()).isEqualTo(Collections.singleton(finishedSplit)); - return; - } - - List> result = readRecords(records, nextSplit, startRecordSkipCount); - assertThat(result.stream().map(t -> t.f1).collect(Collectors.toList())).isEqualTo(expected); - } - - private List> readRecords( - RecordsWithSplitIds> records, - String nextSplit, - long startRecordSkipCount) { - assertThat(records.finishedSplits()).isEmpty(); - assertThat(records.nextSplit()).isEqualTo(nextSplit); - List> result = new ArrayList<>(); - RecordIterator iterator; - while ((iterator = records.nextRecordFromSplit()) != null) { - RecordAndPosition record; - while ((record = iterator.next()) != null) { - result.add( - new Tuple2<>( - record.getRecord().getRowKind(), record.getRecord().getLong(0))); - assertThat(record.getRecordSkipCount()).isEqualTo(++startRecordSkipCount); - } - } - records.recycle(); - return result; - } - - private List> kvs() { - return kvs(0); - } - - private List> kvs(long keyBase) { - List> kvs = new ArrayList<>(); - kvs.add(new Tuple2<>(keyBase + 1L, 1L)); - kvs.add(new Tuple2<>(keyBase + 2L, 2L)); - kvs.add(new Tuple2<>(keyBase + 3L, 2L)); - kvs.add(new Tuple2<>(keyBase + 4L, -1L)); - kvs.add(new Tuple2<>(keyBase + 5L, 1L)); - kvs.add(new Tuple2<>(keyBase + 6L, -2L)); - return kvs; - } - - private void assignSplit(FileStoreSourceSplitReader reader, FileStoreSourceSplit split) { - SplitsChange splitsChange = - new SplitsAddition<>(Collections.singletonList(split)); - reader.handleSplitsChanges(splitsChange); - } -} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderWithBranchTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderWithBranchTest.java index 8ba0c7e7bd2a4..15bc68241c472 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderWithBranchTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderWithBranchTest.java @@ -20,7 +20,7 @@ import org.apache.paimon.disk.IOManager; import org.apache.paimon.flink.source.FileStoreSourceReader; -import org.apache.paimon.flink.source.FileStoreSourceReaderWithBranchTest; +import org.apache.paimon.flink.source.FileStoreSourceReaderTest; import org.apache.paimon.flink.source.TestChangelogDataReadWrite; import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; @@ -37,7 +37,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** Unit tests for the {@link AlignedSourceReader}. */ -public class AlignedSourceReaderWithBranchTest extends FileStoreSourceReaderWithBranchTest { +public class AlignedSourceReaderWithBranchTest extends FileStoreSourceReaderTest { @Override @Test