diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 53bd1291f1f0b..effa3ff3dfea2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -18,6 +18,7 @@ package org.apache.paimon; +import org.apache.paimon.branch.BranchAutoManager; import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile; import org.apache.paimon.fs.FileIO; @@ -49,6 +50,7 @@ import org.apache.paimon.table.sink.TagCallback; import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SnapshotManager; @@ -311,6 +313,13 @@ public TagAutoManager newTagCreationManager() { createTagCallbacks()); } + @Override + public BranchAutoManager newBranchCreationManager() { + return BranchAutoManager.create( + new BranchManager( + fileIO, options.path(), snapshotManager(), newTagManager(), schemaManager)); + } + @Override public List createTagCallbacks() { List callbacks = new ArrayList<>(CallbackUtils.loadTagCallbacks(options)); diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index f9bf4c8440bd7..435551ad3b508 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -18,6 +18,7 @@ package org.apache.paimon; +import org.apache.paimon.branch.BranchAutoManager; import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.manifest.IndexManifestFile; @@ -100,6 +101,8 @@ public interface FileStore { TagAutoManager newTagCreationManager(); + BranchAutoManager newBranchCreationManager(); + ServiceManager newServiceManager(); boolean mergeSchema(RowType rowType, boolean allowExplicitCast); diff --git a/paimon-core/src/main/java/org/apache/paimon/branch/Branch.java b/paimon-core/src/main/java/org/apache/paimon/branch/Branch.java new file mode 100644 index 0000000000000..60c6fa888a775 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/branch/Branch.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.branch; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.utils.JsonSerdeUtil; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.Objects; + +public class Branch { + + private static final String FIELD_BRANCH_NAME = "branchName"; + private static final String FIELD_FROM_SNAPSHOT_ID = "fromSnapshotId"; + private static final String FIELD_FROM_TAG_NAME = "fromTagName"; + private static final String FIELD_BRANCH_CREATE_TIME = "branchCreateTime"; + private static final String FIELD_BRANCH_TIME_RETAINED = "branchTimeRetained"; + + @JsonProperty(FIELD_BRANCH_NAME) + @Nullable + private final String branchName; + + @JsonProperty(FIELD_FROM_SNAPSHOT_ID) + @Nullable + private final Long fromSnapshotId; + + @JsonProperty(FIELD_FROM_TAG_NAME) + @Nullable + private final String fromTagName; + + @JsonProperty(FIELD_BRANCH_CREATE_TIME) + @Nullable + private final LocalDateTime branchCreateTime; + + @JsonProperty(FIELD_BRANCH_TIME_RETAINED) + @Nullable + private final Duration branchTimeRetained; + + public Branch( + @JsonProperty(FIELD_BRANCH_NAME) String branchName, + @JsonProperty(FIELD_FROM_SNAPSHOT_ID) Long fromSnapshotId, + @JsonProperty(FIELD_FROM_TAG_NAME) String fromTagName, + @JsonProperty(FIELD_BRANCH_CREATE_TIME) LocalDateTime branchCreateTime, + @JsonProperty(FIELD_BRANCH_TIME_RETAINED) Duration branchTimeRetained) { + this.branchName = branchName; + this.fromSnapshotId = fromSnapshotId; + this.fromTagName = fromTagName; + this.branchCreateTime = branchCreateTime; + this.branchTimeRetained = branchTimeRetained; + } + + @JsonGetter(FIELD_BRANCH_NAME) + @Nullable + public String getBranchName() { + return branchName; + } + + @JsonGetter(FIELD_FROM_SNAPSHOT_ID) + @Nullable + public Long getFromSnapshotId() { + return fromSnapshotId; + } + + @JsonGetter(FIELD_FROM_TAG_NAME) + @Nullable + public String getFromTagName() { + return fromTagName; + } + + @JsonGetter(FIELD_BRANCH_CREATE_TIME) + @Nullable + public LocalDateTime getBranchCreateTime() { + return branchCreateTime; + } + + @JsonGetter(FIELD_BRANCH_TIME_RETAINED) + @Nullable + public Duration getBranchTimeRetained() { + return branchTimeRetained; + } + + public static Branch fromTagAndBranchTtl( + String branchName, + String tagName, + Snapshot snapshot, + LocalDateTime branchCreateTime, + Duration branchTimeRetained) { + return new Branch( + branchName, + snapshot == null ? null : snapshot.id(), + tagName, + branchCreateTime, + branchTimeRetained); + } + + public String toJson() { + return JsonSerdeUtil.toJson(this); + } + + public static Branch fromJson(String json) { + return JsonSerdeUtil.fromJson(json, Branch.class); + } + + @Nullable + public static Branch safelyFromPath(FileIO fileIO, Path path) throws IOException { + try { + if (!fileIO.exists(path)) { + // Compatible with older versions + return null; + } + String json = fileIO.readFileUtf8(path); + return Branch.fromJson(json); + } catch (FileNotFoundException e) { + return null; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Branch branch = (Branch) o; + return Objects.equals(branchName, branch.branchName) + && Objects.equals(fromSnapshotId, branch.fromSnapshotId) + && Objects.equals(fromTagName, branch.fromTagName) + && Objects.equals(branchCreateTime, branch.branchCreateTime) + && Objects.equals(branchTimeRetained, branch.branchTimeRetained); + } + + @Override + public int hashCode() { + return Objects.hash( + branchName, fromSnapshotId, fromTagName, branchCreateTime, branchTimeRetained); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/branch/BranchAutoManager.java b/paimon-core/src/main/java/org/apache/paimon/branch/BranchAutoManager.java new file mode 100644 index 0000000000000..f7495d5fb8c6a --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/branch/BranchAutoManager.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.branch; + +import org.apache.paimon.utils.BranchManager; + +/** A manager to create and expire branches. */ +public class BranchAutoManager { + + private final BranchManager branchManager; + + private BranchAutoManager(BranchManager branchManager) { + this.branchManager = branchManager; + } + + public void run() { + if (branchManager != null) { + branchManager.expireBranches(); + } + } + + public static BranchAutoManager create(BranchManager branchManager) { + return new BranchAutoManager(branchManager); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java index ec068b25acb32..e344fa2e21ac2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.FileStore; +import org.apache.paimon.branch.BranchAutoManager; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; @@ -189,6 +190,12 @@ public TagAutoManager newTagCreationManager() { return wrapped.newTagCreationManager(); } + @Override + public BranchAutoManager newBranchCreationManager() { + privilegeChecker.assertCanInsert(identifier); + return wrapped.newBranchCreationManager(); + } + @Override public ServiceManager newServiceManager() { privilegeChecker.assertCanSelect(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java index 4eb1a1a90c8c1..b9e1d69da89d1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java @@ -19,6 +19,7 @@ package org.apache.paimon.privilege; import org.apache.paimon.FileStore; +import org.apache.paimon.branch.Branch; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.schema.TableSchema; @@ -38,6 +39,7 @@ import org.apache.paimon.utils.TagManager; import java.time.Duration; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -151,18 +153,36 @@ public void createBranch(String branchName) { wrapped.createBranch(branchName); } + @Override + public void createBranch(String branchName, Duration timeRetained) { + privilegeChecker.assertCanInsert(identifier); + wrapped.createBranch(branchName, timeRetained); + } + @Override public void createBranch(String branchName, String tagName) { privilegeChecker.assertCanInsert(identifier); wrapped.createBranch(branchName, tagName); } + @Override + public void createBranch(String branchName, String tagName, Duration timeRetained) { + privilegeChecker.assertCanInsert(identifier); + wrapped.createBranch(branchName, tagName, timeRetained); + } + @Override public void deleteBranch(String branchName) { privilegeChecker.assertCanInsert(identifier); wrapped.deleteBranch(branchName); } + @Override + public List expireBranches() { + privilegeChecker.assertCanInsert(identifier); + return wrapped.expireBranches(); + } + @Override public void fastForward(String branchName) { privilegeChecker.assertCanInsert(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 7682cdf7724a4..22244e94157c6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; +import org.apache.paimon.branch.Branch; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.fs.FileIO; @@ -405,6 +406,7 @@ public TableCommitImpl newCommit(String commitUser) { snapshotExpire, options.writeOnly() ? null : store().newPartitionExpire(commitUser), options.writeOnly() ? null : store().newTagCreationManager(), + options.writeOnly() ? null : store().newBranchCreationManager(), catalogEnvironment.lockFactory().create(), CoreOptions.fromMap(options()).consumerExpireTime(), new ConsumerManager(fileIO, path, snapshotManager().branch()), @@ -598,12 +600,22 @@ public void deleteTag(String tagName) { @Override public void createBranch(String branchName) { - branchManager().createBranch(branchName); + createBranch(branchName, (Duration) null); + } + + @Override + public void createBranch(String branchName, Duration timeRetained) { + branchManager().createBranch(branchName, timeRetained); } @Override public void createBranch(String branchName, String tagName) { - branchManager().createBranch(branchName, tagName); + createBranch(branchName, tagName, (Duration) null); + } + + @Override + public void createBranch(String branchName, String tagName, Duration timeRetained) { + branchManager().createBranch(branchName, tagName, timeRetained); } @Override @@ -611,6 +623,11 @@ public void deleteBranch(String branchName) { branchManager().deleteBranch(branchName); } + @Override + public List expireBranches() { + return branchManager().expireBranches(); + } + @Override public void fastForward(String branchName) { branchManager().fastForward(branchName); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java index 5d6331aa414e4..42ca9556052e8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.FileStore; import org.apache.paimon.Snapshot; +import org.apache.paimon.branch.Branch; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.IndexManifestEntry; @@ -45,6 +46,7 @@ import org.apache.paimon.utils.TagManager; import java.time.Duration; +import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; @@ -202,16 +204,31 @@ public void createBranch(String branchName) { wrapped.createBranch(branchName); } + @Override + public void createBranch(String branchName, Duration timeRetained) { + wrapped.createBranch(branchName, timeRetained); + } + @Override public void createBranch(String branchName, String tagName) { wrapped.createBranch(branchName, tagName); } + @Override + public void createBranch(String branchName, String tagName, Duration timeRetained) { + wrapped.createBranch(branchName, tagName, timeRetained); + } + @Override public void deleteBranch(String branchName) { wrapped.deleteBranch(branchName); } + @Override + public List expireBranches() { + return wrapped.expireBranches(); + } + @Override public void fastForward(String branchName) { wrapped.fastForward(branchName); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java index 3224131d4afd4..8b84e4174622f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java @@ -20,6 +20,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.Public; +import org.apache.paimon.branch.Branch; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.ManifestEntry; @@ -286,16 +287,31 @@ default void createBranch(String branchName) { throw new UnsupportedOperationException(); } + @Override + default void createBranch(String branchName, Duration timeRetained) { + throw new UnsupportedOperationException(); + } + @Override default void createBranch(String branchName, String tagName) { throw new UnsupportedOperationException(); } + @Override + default void createBranch(String branchName, String tagName, Duration timeRetained) { + throw new UnsupportedOperationException(); + } + @Override default void deleteBranch(String branchName) { throw new UnsupportedOperationException(); } + @Override + default List expireBranches() { + throw new UnsupportedOperationException(); + } + @Override default void fastForward(String branchName) { throw new UnsupportedOperationException(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index 4ae593b5577fa..eb40efc8b22b3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -19,6 +19,7 @@ package org.apache.paimon.table; import org.apache.paimon.Snapshot; +import org.apache.paimon.branch.Branch; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFileMeta; @@ -221,6 +222,14 @@ default void createBranch(String branchName) { this.getClass().getSimpleName())); } + @Override + default void createBranch(String branchName, Duration timeRetained) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support createBranch.", + this.getClass().getSimpleName())); + } + @Override default void createBranch(String branchName, String tagName) { throw new UnsupportedOperationException( @@ -229,6 +238,14 @@ default void createBranch(String branchName, String tagName) { this.getClass().getSimpleName())); } + @Override + default void createBranch(String branchName, String tagName, Duration timeRetained) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support createBranch.", + this.getClass().getSimpleName())); + } + @Override default void deleteBranch(String branchName) { throw new UnsupportedOperationException( @@ -237,6 +254,14 @@ default void deleteBranch(String branchName) { this.getClass().getSimpleName())); } + @Override + default List expireBranches() { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support expireBranches.", + this.getClass().getSimpleName())); + } + @Override default void fastForward(String branchName) { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 613dfca3158a0..d82d570a73290 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -21,6 +21,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.Experimental; import org.apache.paimon.annotation.Public; +import org.apache.paimon.branch.Branch; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFileMeta; @@ -136,14 +137,20 @@ default void deleteTags(String tagNames) { @Experimental void rollbackTo(String tagName); - /** Create an empty branch. */ @Experimental void createBranch(String branchName); - /** Create a branch from given tag. */ + /** Create an empty branch. */ + @Experimental + void createBranch(String branchName, Duration timeRetained); + @Experimental void createBranch(String branchName, String tagName); + /** Create a branch from given tag. */ + @Experimental + void createBranch(String branchName, String tagName, Duration timeRetained); + /** Delete a branch by branchName. */ @Experimental void deleteBranch(String branchName); @@ -156,6 +163,10 @@ default void deleteBranches(String branchNames) { } } + /** Manually expire branch */ + @Experimental + List expireBranches(); + /** Merge a branch to main branch. */ @Experimental void fastForward(String branchName); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index b4f8fa47dbb1e..de59820eabfb2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.sink; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.branch.BranchAutoManager; import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.fs.Path; @@ -78,6 +79,7 @@ public class TableCommitImpl implements InnerTableCommit { @Nullable private final Runnable expireSnapshots; @Nullable private final PartitionExpire partitionExpire; @Nullable private final TagAutoManager tagAutoManager; + @Nullable private final BranchAutoManager branchAutoManager; private final Lock lock; @Nullable private final Duration consumerExpireTime; @@ -97,6 +99,7 @@ public TableCommitImpl( @Nullable Runnable expireSnapshots, @Nullable PartitionExpire partitionExpire, @Nullable TagAutoManager tagAutoManager, + @Nullable BranchAutoManager branchAutoManager, Lock lock, @Nullable Duration consumerExpireTime, ConsumerManager consumerManager, @@ -113,6 +116,7 @@ public TableCommitImpl( this.expireSnapshots = expireSnapshots; this.partitionExpire = partitionExpire; this.tagAutoManager = tagAutoManager; + this.branchAutoManager = branchAutoManager; this.lock = lock; this.consumerExpireTime = consumerExpireTime; @@ -344,6 +348,10 @@ private void expire(long partitionExpireIdentifier) { if (tagAutoManager != null) { tagAutoManager.run(); } + + if (branchAutoManager != null) { + branchAutoManager.run(); + } } public void expireSnapshots() { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java index a055db6d58cd8..c5c85549c5733 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.system; import org.apache.paimon.Snapshot; +import org.apache.paimon.branch.Branch; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -87,7 +88,9 @@ public class BranchesTable implements ReadonlyTable { new DataField( 1, "created_from_tag", SerializationUtils.newStringType(true)), new DataField(2, "created_from_snapshot", new BigIntType(true)), - new DataField(3, "create_time", new TimestampType(false, 3)))); + new DataField(3, "create_time", new TimestampType(false, 3)), + new DataField( + 4, "time_retained", SerializationUtils.newStringType(true)))); private final FileIO fileIO; private final Path location; @@ -229,6 +232,8 @@ private List branches(FileStoreTable table) throws IOException { BranchManager branchManager = table.branchManager(); SchemaManager schemaManager = new SchemaManager(fileIO, table.location()); + Map branchesAsMap = branchManager.branchObjectsAsMap(); + List> paths = listVersionedDirectories(fileIO, branchManager.branchDirectory(), BRANCH_PREFIX) .map(status -> Pair.of(status.getPath(), status.getModificationTime())) @@ -267,14 +272,29 @@ private List branches(FileStoreTable table) throws IOException { } } } + Branch currentBranch = null; + if (branchesAsMap.containsKey(branchName)) { + currentBranch = branchesAsMap.get(branchName); + } result.add( GenericRow.of( BinaryString.fromString(branchName), BinaryString.fromString(basedTag), basedSnapshotId, - Timestamp.fromLocalDateTime( - DateTimeUtils.toLocalDateTime(creationTime)))); + (currentBranch == null + || currentBranch.getBranchCreateTime() == null) + ? Timestamp.fromLocalDateTime( + DateTimeUtils.toLocalDateTime(creationTime)) + : Timestamp.fromLocalDateTime( + currentBranch.getBranchCreateTime()), + (currentBranch == null + || currentBranch.getBranchTimeRetained() == null) + ? null + : Optional.ofNullable(currentBranch.getBranchTimeRetained()) + .map(Object::toString) + .map(BinaryString::fromString) + .orElse(null))); } return result; diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index c2793de377991..699f974c7d55a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -19,17 +19,24 @@ package org.apache.paimon.utils; import org.apache.paimon.Snapshot; +import org.apache.paimon.branch.Branch; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.tag.Tag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -88,8 +95,13 @@ public Path branchPath(String branchName) { return new Path(branchPath(tablePath, branchName)); } + /** Return the path of a branch metadata file. */ + public Path branchMetadataPath(String branchName) { + return new Path(branchPath(tablePath, branchName) + "/METADATA"); + } + /** Create empty branch. */ - public void createBranch(String branchName) { + public void createBranch(String branchName, Duration timeRetained) { validateBranch(branchName); try { @@ -98,6 +110,9 @@ public void createBranch(String branchName) { schemaManager.toSchemaPath(latestSchema.id()), schemaManager.copyWithBranch(branchName).toSchemaPath(latestSchema.id()), true); + + // Create branch metadata file with timeRetained + overrideBranchMetaData(branchName, null, null, timeRetained); } catch (IOException e) { throw new RuntimeException( String.format( @@ -107,7 +122,7 @@ branchName, branchPath(tablePath, branchName)), } } - public void createBranch(String branchName, String tagName) { + public void createBranch(String branchName, String tagName, Duration timeRetained) { validateBranch(branchName); checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not exists.", tagName); @@ -127,6 +142,8 @@ public void createBranch(String branchName, String tagName) { schemaManager.toSchemaPath(snapshot.schemaId()), schemaManager.copyWithBranch(branchName).toSchemaPath(snapshot.schemaId()), true); + // Create branch metadata file with timeRetained + overrideBranchMetaData(branchName, tagName, snapshot, timeRetained); } catch (IOException e) { throw new RuntimeException( String.format( @@ -136,6 +153,16 @@ branchName, branchPath(tablePath, branchName)), } } + private void overrideBranchMetaData( + String branchName, String tagName, Snapshot snapshot, Duration timeRetained) + throws IOException { + String content = + Branch.fromTagAndBranchTtl( + branchName, tagName, snapshot, LocalDateTime.now(), timeRetained) + .toJson(); + fileIO.overwriteFileUtf8(branchMetadataPath(branchName), content); + } + public void deleteBranch(String branchName) { checkArgument(branchExists(branchName), "Branch name '%s' doesn't exist.", branchName); try { @@ -233,6 +260,36 @@ public List branches() { } } + public Map branchObjectsAsMap() { + List branches = branchObjects(); + if (branches.isEmpty()) { + return Collections.emptyMap(); + } + return branchObjects().stream() + .collect(Collectors.toMap(branch -> branch.getBranchName(), branch -> branch)); + } + + /** Get all {@link Tag}s. */ + public List branchObjects() { + try { + List paths = + listVersionedDirectories(fileIO, branchDirectory(), BRANCH_PREFIX) + .map(status -> status.getPath()) + .collect(Collectors.toList()); + List branches = new ArrayList<>(); + for (Path path : paths) { + String branchName = path.getName().substring(BRANCH_PREFIX.length()); + Branch branch = Branch.safelyFromPath(fileIO, branchMetadataPath(branchName)); + if (branch != null) { + branches.add(branch); + } + } + return branches; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + private void validateBranch(String branchName) { checkArgument( !isMainBranch(branchName), @@ -249,4 +306,25 @@ private void validateBranch(String branchName) { "Branch name cannot be pure numeric string but is '%s'.", branchName); } + + public List expireBranches() { + List expiredBranch = new ArrayList<>(); + List branches = branchObjects(); + for (Branch branch : branches) { + LocalDateTime createTime = branch.getBranchCreateTime(); + Duration timeRetained = branch.getBranchTimeRetained(); + if (createTime == null || timeRetained == null) { + continue; + } + if (LocalDateTime.now().isAfter(createTime.plus(timeRetained))) { + LOG.info( + "Delete branch {}, because its existence time has reached its timeRetained of {}.", + branch.getBranchName(), + timeRetained); + deleteBranch(branch.getBranchName()); + expiredBranch.add(branch); + } + } + return expiredBranch; + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/BranchesTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/BranchesTableTest.java index f1fbbe0177c1a..096b1fb536734 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/BranchesTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/BranchesTableTest.java @@ -27,6 +27,7 @@ import org.apache.paimon.table.TableTestBase; import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.TimeUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -90,4 +91,19 @@ void testBranches() throws Exception { .collect(Collectors.toList())) .containsExactlyInAnyOrder("my_branch1", "my_branch2", "my_branch3"); } + + @Test + void testBranchesWithMaxAge() throws Exception { + String retainTime = "10000ms"; + table.createBranch("my_branch1", "2023-07-17", TimeUtils.parseDuration(retainTime)); + table.createBranch("my_branch2", "2023-07-18", TimeUtils.parseDuration(retainTime)); + table.createBranch("my_branch3", "2023-07-18", TimeUtils.parseDuration(retainTime)); + List branches = read(branchesTable); + assertThat(branches.size()).isEqualTo(3); + assertThat( + branches.stream() + .map(v -> v.getString(0).toString()) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder("my_branch1", "my_branch2", "my_branch3"); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java index aa8cc697ae365..51a7c6f1315ac 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java @@ -20,12 +20,14 @@ import org.apache.commons.lang3.StringUtils; +import java.time.Duration; import java.util.Map; /** Create branch action for Flink. */ public class CreateBranchAction extends TableActionBase { private final String branchName; private final String tagName; + private final Duration timeRetained; public CreateBranchAction( String warehouse, @@ -33,18 +35,20 @@ public CreateBranchAction( String tableName, Map catalogConfig, String branchName, - String tagName) { + String tagName, + Duration timeRetained) { super(warehouse, databaseName, tableName, catalogConfig); this.branchName = branchName; this.tagName = tagName; + this.timeRetained = timeRetained; } @Override public void run() throws Exception { if (!StringUtils.isBlank(tagName)) { - table.createBranch(branchName, tagName); + table.createBranch(branchName, tagName, timeRetained); } else { - table.createBranch(branchName); + table.createBranch(branchName, timeRetained); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java index d1071d0870ad3..d3e92e78fe9a7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java @@ -18,8 +18,11 @@ package org.apache.paimon.flink.action; +import org.apache.paimon.utils.TimeUtils; + import org.apache.flink.api.java.tuple.Tuple3; +import java.time.Duration; import java.util.Map; import java.util.Optional; @@ -30,6 +33,7 @@ public class CreateBranchActionFactory implements ActionFactory { private static final String TAG_NAME = "tag_name"; private static final String BRANCH_NAME = "branch_name"; + private static final String TIME_RETAINED = "time_retained"; @Override public String identifier() { @@ -50,6 +54,11 @@ public Optional create(MultipleParameterToolAdapter params) { String branchName = params.get(BRANCH_NAME); + Duration timeRetained = null; + if (params.has(TIME_RETAINED)) { + timeRetained = TimeUtils.parseDuration(params.get(TIME_RETAINED)); + } + CreateBranchAction action = new CreateBranchAction( tablePath.f0, @@ -57,7 +66,8 @@ public Optional create(MultipleParameterToolAdapter params) { tablePath.f2, catalogConfig, branchName, - tagName); + tagName, + timeRetained); return Optional.of(action); } @@ -69,7 +79,7 @@ public void printHelp() { System.out.println("Syntax:"); System.out.println( " create_branch --warehouse --database " - + "--table --branch_name [--tag_name ]"); + + "--table --branch_name [--tag_name ] [--time_retained ]"); System.out.println(); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireBranchAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireBranchAction.java new file mode 100644 index 0000000000000..fc95184052330 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireBranchAction.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import java.util.Map; + +/** Expire branch action for Flink. */ +public class ExpireBranchAction extends TableActionBase { + + public ExpireBranchAction( + String warehouse, + String databaseName, + String tableName, + Map catalogConfig) { + super(warehouse, databaseName, tableName, catalogConfig); + } + + @Override + public void run() throws Exception { + table.expireBranches(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireBranchActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireBranchActionFactory.java new file mode 100644 index 0000000000000..f747c9152ddb2 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireBranchActionFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.flink.api.java.tuple.Tuple3; + +import java.util.Map; +import java.util.Optional; + +/** Factory to create {@link ExpireBranchAction}. */ +public class ExpireBranchActionFactory implements ActionFactory { + + public static final String IDENTIFIER = "expire_branch"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Optional create(MultipleParameterToolAdapter params) { + Tuple3 tablePath = getTablePath(params); + Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); + ExpireBranchAction action = + new ExpireBranchAction(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig); + return Optional.of(action); + } + + @Override + public void printHelp() { + System.out.println("Action \"expire_branch\" clean expired branches."); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " expire_branch --warehouse --database --table "); + System.out.println(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java index 74cf85a4c5c27..7cf3f0473fac4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java @@ -21,6 +21,7 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.table.Table; +import org.apache.paimon.utils.TimeUtils; import org.apache.commons.lang3.StringUtils; import org.apache.flink.table.annotation.ArgumentHint; @@ -28,6 +29,10 @@ import org.apache.flink.table.annotation.ProcedureHint; import org.apache.flink.table.procedure.ProcedureContext; +import javax.annotation.Nullable; + +import java.time.Duration; + /** * Create branch procedure for given tag. Usage: * @@ -48,17 +53,34 @@ public String identifier() { argument = { @ArgumentHint(name = "table", type = @DataTypeHint("STRING")), @ArgumentHint(name = "branch", type = @DataTypeHint("STRING")), - @ArgumentHint(name = "tag", type = @DataTypeHint("STRING"), isOptional = true) + @ArgumentHint(name = "tag", type = @DataTypeHint("STRING"), isOptional = true), + @ArgumentHint( + name = "time_retained", + type = @DataTypeHint("STRING"), + isOptional = true) }) public String[] call( - ProcedureContext procedureContext, String tableId, String branchName, String tagName) + ProcedureContext procedureContext, + String tableId, + String branchName, + String tagName, + String timeRetained) throws Catalog.TableNotExistException { Table table = catalog.getTable(Identifier.fromString(tableId)); if (!StringUtils.isBlank(tagName)) { - table.createBranch(branchName, tagName); + table.createBranch(branchName, tagName, toDuration(timeRetained)); } else { - table.createBranch(branchName); + table.createBranch(branchName, toDuration(timeRetained)); } return new String[] {"Success"}; } + + @Nullable + private static Duration toDuration(@Nullable String s) { + if (s == null) { + return null; + } + + return TimeUtils.parseDuration(s); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireBranchProcedure.java new file mode 100644 index 0000000000000..3fa081ad76abd --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireBranchProcedure.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.branch.Branch; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +import java.util.List; + +/** + * Delete branch procedure. Usage: + * + *

+ *  CALL sys.expire_branch('tableId')
+ * 
+ */ +public class ExpireBranchProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "expire_branch"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @ProcedureHint( + argument = { + @ArgumentHint(name = "table", type = @DataTypeHint("STRING")), + }) + public String[] call(ProcedureContext procedureContext, String tableId) + throws Catalog.TableNotExistException { + List expireBranches = + catalog.getTable(Identifier.fromString(tableId)).expireBranches(); + return expireBranches == null || expireBranches.isEmpty() + ? new String[] {"No expired branches."} + : expireBranches.stream().map(Branch::getBranchName).toArray(String[]::new); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 3ae35ade54bba..ed2ef64149767 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -35,6 +35,7 @@ org.apache.paimon.flink.action.ExpirePartitionsActionFactory org.apache.paimon.flink.action.MarkPartitionDoneActionFactory org.apache.paimon.flink.action.CreateBranchActionFactory org.apache.paimon.flink.action.DeleteBranchActionFactory +org.apache.paimon.flink.action.ExpireBranchActionFactory org.apache.paimon.flink.action.FastForwardActionFactory org.apache.paimon.flink.action.RenameTagActionFactory org.apache.paimon.flink.action.RepairActionFactory @@ -51,6 +52,7 @@ org.apache.paimon.flink.procedure.CreateTagFromWatermarkProcedure org.apache.paimon.flink.procedure.DeleteTagProcedure org.apache.paimon.flink.procedure.CreateBranchProcedure org.apache.paimon.flink.procedure.DeleteBranchProcedure +org.apache.paimon.flink.procedure.ExpireBranchProcedure org.apache.paimon.flink.procedure.DropPartitionProcedure org.apache.paimon.flink.procedure.MergeIntoProcedure org.apache.paimon.flink.procedure.ResetConsumerProcedure @@ -71,4 +73,4 @@ org.apache.paimon.flink.procedure.RepairProcedure org.apache.paimon.flink.procedure.RenameTagProcedure org.apache.paimon.flink.procedure.FastForwardProcedure org.apache.paimon.flink.procedure.MarkPartitionDoneProcedure -org.apache.paimon.flink.procedure.CloneProcedure +org.apache.paimon.flink.procedure.CloneProcedure \ No newline at end of file diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java index 0c9f4ec6c5e90..67c66243aa824 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.utils.BlockingIterator; +import org.apache.paimon.utils.TimeUtils; import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.types.Row; @@ -289,6 +290,28 @@ public void testDynamicOptions() throws Exception { assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(2)); } + @Test + public void testReadWriteBranchWithMaxAge() throws Exception { + // create table + sql("CREATE TABLE T (id INT)"); + // insert data + batchSql("INSERT INTO T VALUES (1)"); + // create tag + paimonTable("T").createTag("tag1", 1); + // create branch + paimonTable("T").createBranch("branch1", "tag1", TimeUtils.parseDuration("5000ms")); + // insert data to branch + batchSql("INSERT INTO T/*+ OPTIONS('branch' = 'branch1') */ VALUES (2)"); + List rows = batchSql("select * from T /*+ OPTIONS('branch' = 'branch1') */"); + assertThat(rows).containsExactlyInAnyOrder(Row.of(2), Row.of(1)); + // Fast Forward branch1 + paimonTable("T").branchManager().fastForward("branch1"); + Thread.sleep(5000); + // insert data to main branch and expired branch1 + batchSql("INSERT INTO T VALUES (2)"); + assertThat(paimonTable("T").branchManager().branchExists("branch1")).isFalse(); + } + @Test public void testReadWriteBranch() throws Exception { // create table diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index 6970eb043b250..52b91d70f0c8e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -169,6 +169,42 @@ public void testCreateEmptyBranch() throws Exception { .containsExactlyInAnyOrder("+I[3, 30, banana]"); } + @Test + public void testCreateEmptyBranchWithMaxAge() throws Exception { + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + // snapshot 1. + sql("INSERT INTO T VALUES(1, 10, 'apple')"); + // snapshot 2. + sql("INSERT INTO T VALUES(1, 20, 'dog')"); + + assertThat(collectResult("SELECT * FROM T")) + .containsExactlyInAnyOrder("+I[1, 10, apple]", "+I[1, 20, dog]"); + // create empty branch. + sql("CALL sys.create_branch('default.T', 'empty_branch1')"); + sql("CALL sys.create_branch('default.T', 'empty_branch2')"); + sql( + "CALL sys.create_branch(`table`=>'default.T', `branch`=>'empty_branch3', `time_retained`=>'1000ms')"); + // Get all branches + assertThat(collectResult("SELECT * FROM T$branches").size()).isEqualTo(3); + + Thread.sleep(1000); + sql("INSERT INTO `T$branch_empty_branch1` VALUES (3, 30, 'banana')"); + assertThat(collectResult("SELECT * FROM T$branch_empty_branch1")) + .containsExactlyInAnyOrder("+I[3, 30, banana]"); + + // Get all branches, expired empty_branch3 + assertThat(collectResult("SELECT * FROM T$branches").size()).isEqualTo(2); + } + @Test public void testDeleteBranchTable() throws Exception { sql( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java index a8286964832a0..de48f8e7136ee 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java @@ -47,19 +47,7 @@ class BranchActionITCase extends ActionITCaseBase { @Test void testCreateAndDeleteBranch() throws Exception { - init(warehouse); - - RowType rowType = - RowType.of( - new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()}, - new String[] {"k", "v"}); - FileStoreTable table = - createFileStoreTable( - rowType, - Collections.emptyList(), - Collections.singletonList("k"), - Collections.emptyList(), - Collections.emptyMap()); + FileStoreTable table = fileStoreTable(); StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = writeBuilder.newWrite(); @@ -132,21 +120,61 @@ void testCreateAndDeleteBranch() throws Exception { } @Test - void testCreateAndDeleteEmptyBranch() throws Exception { + void testCreateAndDeleteEmptyBranchWithRetainedTime() throws Exception { + FileStoreTable table = fileStoreTable(); - init(warehouse); + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); - RowType rowType = - RowType.of( - new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()}, - new String[] {"k", "v"}); - FileStoreTable table = - createFileStoreTable( - rowType, - Collections.emptyList(), - Collections.singletonList("k"), - Collections.emptyList(), - Collections.emptyMap()); + // 3 snapshots + writeData(rowData(1L, BinaryString.fromString("Hi"))); + writeData(rowData(2L, BinaryString.fromString("Hello"))); + writeData(rowData(3L, BinaryString.fromString("Paimon"))); + + BranchManager branchManager = table.branchManager(); + executeSQL( + String.format( + "CALL sys.create_branch(`table` =>'%s.%s', branch =>'empty_branch_name', time_retained =>'1000ms')", + database, tableName)); + assertThat(branchManager.branchExists("empty_branch_name")).isTrue(); + Thread.sleep(1000); + executeSQL(String.format("CALL sys.expire_branch('%s.%s')", database, tableName)); + assertThat(branchManager.branchExists("empty_branch_name")).isFalse(); + + createAction( + CreateBranchAction.class, + "create_branch", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--branch_name", + "empty_branch_name") + .run(); + assertThat(branchManager.branchExists("empty_branch_name")).isTrue(); + + createAction( + DeleteBranchAction.class, + "delete_branch", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--branch_name", + "empty_branch_name") + .run(); + assertThat(branchManager.branchExists("empty_branch_name")).isFalse(); + } + + @Test + void testCreateAndDeleteEmptyBranch() throws Exception { + + FileStoreTable table = fileStoreTable(); StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = writeBuilder.newWrite(); @@ -213,18 +241,7 @@ void testCreateAndDeleteEmptyBranch() throws Exception { @Test void testFastForward() throws Exception { - init(warehouse); - RowType rowType = - RowType.of( - new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()}, - new String[] {"k", "v"}); - FileStoreTable table = - createFileStoreTable( - rowType, - Collections.emptyList(), - Collections.singletonList("k"), - Collections.emptyList(), - Collections.emptyMap()); + FileStoreTable table = fileStoreTable(); StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); write = writeBuilder.newWrite(); @@ -355,6 +372,22 @@ void testFastForward() throws Exception { Assert.assertEquals(expected, sortedActual); } + private FileStoreTable fileStoreTable() throws Exception { + init(warehouse); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()}, + new String[] {"k", "v"}); + FileStoreTable table = + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("k"), + Collections.emptyList(), + Collections.emptyMap()); + return table; + } + List readTableData(FileStoreTable table) throws Exception { RowType rowType = RowType.of(