Skip to content

Commit

Permalink
Branch allows for setting expiration times. apache#4274
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Oct 10, 2024
1 parent 88365fc commit 112b4dc
Show file tree
Hide file tree
Showing 22 changed files with 703 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon;

import org.apache.paimon.branch.BranchAutoManager;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.fs.FileIO;
Expand Down Expand Up @@ -49,6 +50,7 @@
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
Expand Down Expand Up @@ -311,6 +313,13 @@ public TagAutoManager newTagCreationManager() {
createTagCallbacks());
}

@Override
public BranchAutoManager newBranchCreationManager() {
return BranchAutoManager.create(
new BranchManager(
fileIO, options.path(), snapshotManager(), newTagManager(), schemaManager));
}

@Override
public List<TagCallback> createTagCallbacks() {
List<TagCallback> callbacks = new ArrayList<>(CallbackUtils.loadTagCallbacks(options));
Expand Down
3 changes: 3 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon;

import org.apache.paimon.branch.BranchAutoManager;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.manifest.IndexManifestFile;
Expand Down Expand Up @@ -100,6 +101,8 @@ public interface FileStore<T> {

TagAutoManager newTagCreationManager();

BranchAutoManager newBranchCreationManager();

ServiceManager newServiceManager();

boolean mergeSchema(RowType rowType, boolean allowExplicitCast);
Expand Down
161 changes: 161 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/branch/Branch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.branch;

import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.JsonSerdeUtil;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Objects;

public class Branch {

private static final String FIELD_BRANCH_NAME = "branchName";
private static final String FIELD_FROM_SNAPSHOT_ID = "fromSnapshotId";
private static final String FIELD_FROM_TAG_NAME = "fromTagName";
private static final String FIELD_BRANCH_CREATE_TIME = "branchCreateTime";
private static final String FIELD_BRANCH_TIME_RETAINED = "branchTimeRetained";

@JsonProperty(FIELD_BRANCH_NAME)
@Nullable
private final String branchName;

@JsonProperty(FIELD_FROM_SNAPSHOT_ID)
@Nullable
private final Long fromSnapshotId;

@JsonProperty(FIELD_FROM_TAG_NAME)
@Nullable
private final String fromTagName;

@JsonProperty(FIELD_BRANCH_CREATE_TIME)
@Nullable
private final LocalDateTime branchCreateTime;

@JsonProperty(FIELD_BRANCH_TIME_RETAINED)
@Nullable
private final Duration tagTimeRetained;

public Branch(
@JsonProperty(FIELD_BRANCH_NAME) String branchName,
@JsonProperty(FIELD_FROM_SNAPSHOT_ID) Long fromSnapshotId,
@JsonProperty(FIELD_FROM_TAG_NAME) String fromTagName,
@JsonProperty(FIELD_BRANCH_CREATE_TIME) LocalDateTime branchCreateTime,
@JsonProperty(FIELD_BRANCH_TIME_RETAINED) Duration tagTimeRetained) {
this.branchName = branchName;
this.fromSnapshotId = fromSnapshotId;
this.fromTagName = fromTagName;
this.branchCreateTime = branchCreateTime;
this.tagTimeRetained = tagTimeRetained;
}

@JsonGetter(FIELD_BRANCH_NAME)
@Nullable
public String getBranchName() {
return branchName;
}

@JsonGetter(FIELD_FROM_SNAPSHOT_ID)
@Nullable
public Long getFromSnapshotId() {
return fromSnapshotId;
}

@JsonGetter(FIELD_FROM_TAG_NAME)
@Nullable
public String getFromTagName() {
return fromTagName;
}

@JsonGetter(FIELD_BRANCH_CREATE_TIME)
@Nullable
public LocalDateTime getBranchCreateTime() {
return branchCreateTime;
}

@JsonGetter(FIELD_BRANCH_TIME_RETAINED)
@Nullable
public Duration getTagTimeRetained() {
return tagTimeRetained;
}

public static Branch fromTagAndBranchTtl(
String branchName,
String tagName,
Snapshot snapshot,
LocalDateTime branchCreateTime,
Duration branchTimeRetained) {
return new Branch(
branchName,
snapshot == null ? null : snapshot.id(),
tagName,
branchCreateTime,
branchTimeRetained);
}

public String toJson() {
return JsonSerdeUtil.toJson(this);
}

public static Branch fromJson(String json) {
return JsonSerdeUtil.fromJson(json, Branch.class);
}

@Nullable
public static Branch safelyFromPath(FileIO fileIO, Path path) throws IOException {
try {
if (!fileIO.exists(path)) {
// Compatible with older versions
return null;
}
String json = fileIO.readFileUtf8(path);
return Branch.fromJson(json);
} catch (FileNotFoundException e) {
return null;
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Branch branch = (Branch) o;
return Objects.equals(branchName, branch.branchName)
&& Objects.equals(fromSnapshotId, branch.fromSnapshotId)
&& Objects.equals(fromTagName, branch.fromTagName)
&& Objects.equals(branchCreateTime, branch.branchCreateTime)
&& Objects.equals(tagTimeRetained, branch.tagTimeRetained);
}

@Override
public int hashCode() {
return Objects.hash(
branchName, fromSnapshotId, fromTagName, branchCreateTime, tagTimeRetained);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.branch;

import org.apache.paimon.utils.BranchManager;

/** A manager to create and expire branches. */
public class BranchAutoManager {

private final BranchManager branchManager;

private BranchAutoManager(BranchManager branchManager) {
this.branchManager = branchManager;
}

public void run() {
if (branchManager != null) {
branchManager.expireBranches();
}
}

public static BranchAutoManager create(BranchManager branchManager) {
return new BranchAutoManager(branchManager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
import org.apache.paimon.branch.BranchAutoManager;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
Expand Down Expand Up @@ -189,6 +190,12 @@ public TagAutoManager newTagCreationManager() {
return wrapped.newTagCreationManager();
}

@Override
public BranchAutoManager newBranchCreationManager() {
privilegeChecker.assertCanInsert(identifier);
return wrapped.newBranchCreationManager();
}

@Override
public ServiceManager newServiceManager() {
privilegeChecker.assertCanSelect(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.privilege;

import org.apache.paimon.FileStore;
import org.apache.paimon.branch.Branch;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.schema.TableSchema;
Expand All @@ -38,6 +39,7 @@
import org.apache.paimon.utils.TagManager;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -151,18 +153,36 @@ public void createBranch(String branchName) {
wrapped.createBranch(branchName);
}

@Override
public void createBranch(String branchName, Duration timeRetained) {
privilegeChecker.assertCanInsert(identifier);
wrapped.createBranch(branchName, timeRetained);
}

@Override
public void createBranch(String branchName, String tagName) {
privilegeChecker.assertCanInsert(identifier);
wrapped.createBranch(branchName, tagName);
}

@Override
public void createBranch(String branchName, String tagName, Duration timeRetained) {
privilegeChecker.assertCanInsert(identifier);
wrapped.createBranch(branchName, tagName, timeRetained);
}

@Override
public void deleteBranch(String branchName) {
privilegeChecker.assertCanInsert(identifier);
wrapped.deleteBranch(branchName);
}

@Override
public List<Branch> expireBranches() {
privilegeChecker.assertCanInsert(identifier);
return wrapped.expireBranches();
}

@Override
public void fastForward(String branchName) {
privilegeChecker.assertCanInsert(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.branch.Branch;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.fs.FileIO;
Expand Down Expand Up @@ -405,6 +406,7 @@ public TableCommitImpl newCommit(String commitUser) {
snapshotExpire,
options.writeOnly() ? null : store().newPartitionExpire(commitUser),
options.writeOnly() ? null : store().newTagCreationManager(),
options.writeOnly() ? null : store().newBranchCreationManager(),
catalogEnvironment.lockFactory().create(),
CoreOptions.fromMap(options()).consumerExpireTime(),
new ConsumerManager(fileIO, path, snapshotManager().branch()),
Expand Down Expand Up @@ -598,19 +600,34 @@ public void deleteTag(String tagName) {

@Override
public void createBranch(String branchName) {
branchManager().createBranch(branchName);
createBranch(branchName, (Duration) null);
}

@Override
public void createBranch(String branchName, Duration timeRetained) {
branchManager().createBranch(branchName, timeRetained);
}

@Override
public void createBranch(String branchName, String tagName) {
branchManager().createBranch(branchName, tagName);
createBranch(branchName, tagName, (Duration) null);
}

@Override
public void createBranch(String branchName, String tagName, Duration timeRetained) {
branchManager().createBranch(branchName, tagName, timeRetained);
}

@Override
public void deleteBranch(String branchName) {
branchManager().deleteBranch(branchName);
}

@Override
public List<Branch> expireBranches() {
return branchManager().expireBranches();
}

@Override
public void fastForward(String branchName) {
branchManager().fastForward(branchName);
Expand Down
Loading

0 comments on commit 112b4dc

Please sign in to comment.