Skip to content

Commit

Permalink
merge branch
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed May 28, 2024
1 parent f1fa6d6 commit 8a69c6a
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,17 @@ public void deleteBranch(String branchName) {
wrapped.deleteBranch(branchName);
}

/**
* Merge a branch to main branch.
*
* @param branchName
*/
@Override
public void mergeBranch(String branchName) {
privilegeChecker.assertCanInsert(identifier);
wrapped.mergeBranch(branchName);
}

@Override
public void replaceBranch(String fromBranch) {
privilegeChecker.assertCanInsert(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.stream.Stream;

import static org.apache.paimon.utils.FileUtils.listVersionedDirectories;
import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Manager for {@code Branch}. */
Expand Down Expand Up @@ -357,12 +358,13 @@ public boolean fileExists(Path path) {

public void mergeBranch(String branchName) {
checkArgument(
!branchName.equals(DEFAULT_MAIN_BRANCH),
!isMainBranch(branchName),
"Branch name '%s' do not use in merge branch.",
branchName);
checkArgument(branchExists(branchName), "Branch name '%s' doesn't exist.", branchName);
Long earliestSnapshotId = snapshotManager.earliestSnapshotId(branchName);
Snapshot earliestSnapshot = snapshotManager.snapshot(branchName, earliestSnapshotId);
Long earliestSnapshotId = snapshotManager.copyWithBranch(branchName).earliestSnapshotId();
Snapshot earliestSnapshot =
snapshotManager.copyWithBranch(branchName).snapshot(earliestSnapshotId);
long earliestSchemaId = earliestSnapshot.schemaId();

try {
Expand Down Expand Up @@ -407,18 +409,19 @@ public void mergeBranch(String branchName) {

fileIO.deleteFilesQuietly(deletePaths);
fileIO.copyFilesUtf8(
snapshotManager.branchSnapshotDirectory(branchName),
snapshotManager.copyWithBranch(branchName).snapshotDirectory(),
snapshotManager.snapshotDirectory());
fileIO.copyFilesUtf8(
schemaManager.branchSchemaDirectory(branchName),
schemaManager.copyWithBranch(branchName).schemaDirectory(),
schemaManager.schemaDirectory());
fileIO.copyFilesUtf8(
tagManager.branchTagDirectory(branchName), tagManager.tagDirectory());
tagManager.copyWithBranch(branchName).tagDirectory(),
tagManager.tagDirectory());
} catch (IOException e) {
throw new RuntimeException(
String.format(
"Exception occurs when merge branch '%s' (directory in %s).",
branchName, getBranchPath(tablePath, branchName)),
branchName, getBranchPath(fileIO, tablePath, branchName)),
e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,11 +608,7 @@ private Long findByListFiles(BinaryOperator<Long> reducer, Path dir, String pref
}

public void deleteLatestHint() throws IOException {
deleteLatestHint(DEFAULT_MAIN_BRANCH);
}

public void deleteLatestHint(String branchName) throws IOException {
Path snapshotDir = snapshotDirByBranch(branchName);
Path snapshotDir = snapshotDirectory();
Path hintFile = new Path(snapshotDir, LATEST);
fileIO.delete(hintFile, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public TagManager copyWithBranch(String branchName) {
public Path tagDirectory() {
return new Path(getBranchPath(fileIO, tablePath, branch) + "/tag");
}

/** Return the path of a tag. */
public Path tagPath(String tagName) {
return new Path(getBranchPath(fileIO, tablePath, branch) + "/tag/" + TAG_PREFIX + tagName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1113,11 +1113,12 @@ public void testMergeBranch() throws Exception {

generateBranch(table);

FileStoreTable tableBranch = createFileStoreTable("branch1");
// Verify branch1 and the main branch have the same data
assertThat(
getResult(
table.newRead(),
toSplits(table.newSnapshotReader("branch1").read().dataSplits()),
toSplits(tableBranch.newSnapshotReader().read().dataSplits()),
BATCH_ROW_TO_STRING))
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");

Expand All @@ -1135,17 +1136,17 @@ public void testMergeBranch() throws Exception {
"Branch name 'main' do not use in merge branch."));

// Write data to branch1
try (StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser, "branch1")) {
try (StreamTableWrite write = tableBranch.newWrite(commitUser);
StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
write.write(rowData(2, 20, 200L));
commit.commit(1, write.prepareCommit(false, 2));
}

// Validate data in branch1
assertThat(
getResult(
table.newRead(),
toSplits(table.newSnapshotReader("branch1").read().dataSplits()),
tableBranch.newRead(),
toSplits(tableBranch.newSnapshotReader().read().dataSplits()),
BATCH_ROW_TO_STRING))
.containsExactlyInAnyOrder(
"0|0|0|binary|varbinary|mapKey:mapVal|multiset",
Expand Down Expand Up @@ -1176,7 +1177,8 @@ public void testMergeBranch() throws Exception {
SnapshotManager snapshotManager = new SnapshotManager(new TraceableFileIO(), tablePath);
Snapshot branchSnapshot =
Snapshot.fromPath(
new TraceableFileIO(), snapshotManager.branchSnapshotPath("branch1", 2));
new TraceableFileIO(),
snapshotManager.copyWithBranch("branch1").snapshotPath(2));
Snapshot snapshot =
Snapshot.fromPath(new TraceableFileIO(), snapshotManager.snapshotPath(2));
assertThat(branchSnapshot.equals(snapshot)).isTrue();
Expand All @@ -1185,13 +1187,14 @@ public void testMergeBranch() throws Exception {
SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), tablePath);
TableSchema branchSchema =
SchemaManager.fromPath(
new TraceableFileIO(), schemaManager.branchSchemaPath("branch1", 0));
new TraceableFileIO(),
schemaManager.copyWithBranch("branch1").toSchemaPath(0));
TableSchema schema0 = schemaManager.schema(0);
assertThat(branchSchema.equals(schema0)).isTrue();

// Write two rows data to branch1 again
try (StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser, "branch1")) {
try (StreamTableWrite write = tableBranch.newWrite(commitUser);
StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
write.write(rowData(3, 30, 300L));
write.write(rowData(4, 40, 400L));
commit.commit(2, write.prepareCommit(false, 3));
Expand All @@ -1201,7 +1204,7 @@ public void testMergeBranch() throws Exception {
assertThat(
getResult(
table.newRead(),
toSplits(table.newSnapshotReader("branch1").read().dataSplits()),
toSplits(tableBranch.newSnapshotReader().read().dataSplits()),
BATCH_ROW_TO_STRING))
.containsExactlyInAnyOrder(
"0|0|0|binary|varbinary|mapKey:mapVal|multiset",
Expand Down

0 comments on commit 8a69c6a

Please sign in to comment.