Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Feb 26, 2024
1 parent 17b4b1e commit 41cd068
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,30 +199,37 @@ private void copyBranchToMain(String fromBranch) throws IOException {
* Delete all the snapshots, tags and schemas in the main branch that are created after the
* created tag for the branch.
*/
private boolean cleanMainBranch(TableBranch referenceBranch) throws IOException {
long createBranchTime = referenceBranch.getCreateTime();
// delete tags
private boolean cleanMainBranch(TableBranch fromBranch) throws IOException {
// Copy tags
List<TableTag> tags = tagManager.tableTags();
TableTag fromTag =
tags.stream()
.filter(
tableTag ->
tableTag.getTagName()
.equals(fromBranch.getCreatedFromTag()))
.findFirst()
.get();
for (TableTag tag : tags) {
if (tag.getCreateTime() >= createBranchTime) {
if (tag.getCreateTime() >= fromTag.getCreateTime()) {
fileIO.delete(tagManager.tagPath(tag.getTagName()), true);
}
}

// delete snapshots
Iterator<Snapshot> snapshotIterator = snapshotManager.snapshots();
while (snapshotIterator.hasNext()) {
Snapshot snapshot = snapshotIterator.next();
if (snapshot.timeMillis() >= createBranchTime) {
// Copy snapshots
Iterator<Snapshot> snapshots = snapshotManager.snapshots();
Snapshot fromSnapshot = snapshotManager.snapshot(fromBranch.getCreatedFromSnapshot());
while (snapshots.hasNext()) {
Snapshot snapshot = snapshots.next();
if (snapshot.timeMillis() >= fromSnapshot.timeMillis()) {
fileIO.delete(snapshotManager.snapshotPath(snapshot.id()), true);
}
}

// Delete schemas
// Copy schemas
List<Long> schemaIds = schemaManager.listAllIds();
for (Long schemaId : schemaIds) {
TableSchema tableSchema = schemaManager.schema(schemaId);
if (tableSchema.timeMillis() >= createBranchTime) {
if (tableSchema.id() >= fromSnapshot.schemaId()) {
fileIO.delete(schemaManager.toSchemaPath(schemaId), true);
}
}
Expand Down Expand Up @@ -254,24 +261,30 @@ private void calculateCopyMainBranchToTargetBranch(String branchName) throws IOE
if (fromBranch == null) {
throw new RuntimeException(String.format("No branches found %s", branchName));
}
long createBranchTime = fromBranch.getCreateTime();
Snapshot fromSnapshot = snapshotManager.snapshot(fromBranch.getCreatedFromSnapshot());
// Copy tags
List<TableTag> tags = tagManager.tableTags();
TableTag fromTag =
tags.stream()
.filter(
tableTag ->
tableTag.getTagName()
.equals(fromBranch.getCreatedFromTag()))
.findFirst()
.get();
for (TableTag tag : tags) {
if (tagManager.branchTagExists(branchName, tag.getTagName())) {
// If it already exists, skip it directly
continue;
}
if (tag.getCreateTime() < createBranchTime) {
if (tag.getCreateTime() < fromTag.getCreateTime()) {
fileIO.copyFileUtf8(
tagManager.tagPath(tag.getTagName()),
tagManager.branchTagPath(branchName, tag.getTagName()));
}
}

// Copy snapshots
Iterator<Snapshot> snapshots = snapshotManager.snapshots();
Snapshot fromSnapshot = snapshotManager.snapshot(fromBranch.getCreatedFromSnapshot());
while (snapshots.hasNext()) {
Snapshot snapshot = snapshots.next();
if (snapshotManager.branchSnapshotExists(branchName, snapshot.id())) {
Expand All @@ -294,7 +307,7 @@ private void calculateCopyMainBranchToTargetBranch(String branchName) throws IOE
// If it already exists, skip it directly
continue;
}
if (tableSchema.timeMillis() < createBranchTime) {
if (tableSchema.timeMillis() < fromSnapshot.timeMillis()) {
fileIO.copyFileUtf8(
schemaManager.toSchemaPath(schemaId),
schemaManager.branchSchemaPath(branchName, schemaId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@
/** IT cases for branch management actions. */
class BranchActionITCase extends ActionITCaseBase {

@Test
void testMergeBranchToTargetBranch() {}

@Test
void testReplaceBranchToTargetBranch() {}

@Test
void testCreateAndDeleteBranch() throws Exception {

Expand Down Expand Up @@ -85,4 +79,57 @@ void testCreateAndDeleteBranch() throws Exception {
"CALL sys.delete_branch('%s.%s', 'branch_name')", database, tableName));
assertThat(branchManager.branchExists("branch_name")).isFalse();
}

@Test
void testMergeBranchToTargetBranch() throws Exception {
init(warehouse);

RowType rowType =
RowType.of(
new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
new String[] {"k", "v"});
FileStoreTable table =
createFileStoreTable(
rowType,
Collections.emptyList(),
Collections.singletonList("k"),
Collections.emptyMap());

StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser);
write = writeBuilder.newWrite();
commit = writeBuilder.newCommit();

// 3 snapshots
writeData(rowData(1L, BinaryString.fromString("Hi")));
writeData(rowData(2L, BinaryString.fromString("Hello")));
writeData(rowData(3L, BinaryString.fromString("Paimon")));

TagManager tagManager = new TagManager(table.fileIO(), table.location());
callProcedure(
String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName));
assertThat(tagManager.tagExists("tag2")).isTrue();

BranchManager branchManager = table.branchManager();
callProcedure(
String.format(
"CALL sys.create_branch('%s.%s', 'branch_name', 'tag2')",
database, tableName));
assertThat(branchManager.branchExists("branch_name")).isTrue();

// 4-5 snapshots
writeData(rowData(4L, BinaryString.fromString("new.data_1")));
writeData(rowData(5L, BinaryString.fromString("new.data_2")));

callProcedure(
String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)", database, tableName));
assertThat(tagManager.tagExists("tag2")).isTrue();

callProcedure(
String.format(
"CALL sys.merge_branch('%s.%s', 'branch_name')", database, tableName));
assertThat(branchManager.branchExists("branch_name")).isFalse();
}

@Test
void testReplaceBranchToTargetBranch() {}
}

0 comments on commit 41cd068

Please sign in to comment.