Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 15, 2024
1 parent 3578743 commit 3924e8b
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor;
import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
import org.apache.paimon.table.sink.InnerTableWrite;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.sink.UnawareBucketRowKeyExtractor;
import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.InnerStreamTableScanImpl;
Expand Down Expand Up @@ -334,6 +336,16 @@ public TableCommitImpl newCommit(String commitUser) {
return newCommit(commitUser, DEFAULT_MAIN_BRANCH);
}

@Override
public TableWriteImpl<?> newWrite(String commitUser) {
return newWrite(commitUser, null, BranchManager.DEFAULT_MAIN_BRANCH);
}

@Override
public InnerTableWrite newWrite(String commitUser, String branch) {
return newWrite(commitUser, null, branch);
}

public TableCommitImpl newCommit(String commitUser, String branchName) {
CoreOptions options = coreOptions();
Runnable snapshotExpire = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.InnerTableWrite;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.AbstractDataTableRead;
import org.apache.paimon.table.source.AppendOnlySplitGenerator;
Expand Down Expand Up @@ -145,7 +146,12 @@ public RecordReader<InternalRow> reader(Split split) throws IOException {

@Override
public TableWriteImpl<InternalRow> newWrite(String commitUser) {
return newWrite(commitUser, null);
return newWrite(commitUser, null, BranchManager.DEFAULT_MAIN_BRANCH);
}

@Override
public InnerTableWrite newWrite(String commitUser, String branch) {
return newWrite(commitUser, null, branch);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@ default InnerTableRead newRead(String branch) {

InnerTableWrite newWrite(String commitUser);

InnerTableWrite newWrite(String commitUser, String branch);

InnerTableCommit newCommit(String commitUser);

InnerTableCommit newCommit(String commitUser, String branch);

@Override
default ReadBuilder newReadBuilder() {
return new ReadBuilderImpl(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public InnerTableRead forceKeepDelete() {

@Override
public TableWriteImpl<KeyValue> newWrite(String commitUser) {
return newWrite(commitUser, null);
return newWrite(commitUser, null, DEFAULT_MAIN_BRANCH);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ default InnerTableWrite newWrite(String commitUser) {
this.getClass().getSimpleName()));
}

@Override
default InnerTableWrite newWrite(String commitUser, String branch) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support newWrite with branch.",
this.getClass().getSimpleName()));
}

@Override
default InnerTableCommit newCommit(String commitUser) {
throw new UnsupportedOperationException(
Expand All @@ -85,6 +93,14 @@ default InnerTableCommit newCommit(String commitUser) {
this.getClass().getSimpleName()));
}

@Override
default InnerTableCommit newCommit(String commitUser, String branch) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support newCommit with branch.",
this.getClass().getSimpleName()));
}

@Override
default InnerStreamTableScan newStreamScan() {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ default BatchWriteBuilder withOverwrite() {
return this;
}

BatchWriteBuilder toBranch(String branch);

/** Overwrite writing, same as the 'INSERT OVERWRITE T PARTITION (...)' semantics of SQL. */
BatchWriteBuilder withOverwrite(@Nullable Map<String, String> staticPartition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.table.InnerTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;

import javax.annotation.Nullable;

Expand All @@ -33,6 +34,7 @@ public class BatchWriteBuilderImpl implements BatchWriteBuilder {

private final InnerTable table;
private final String commitUser = UUID.randomUUID().toString();
private String branch = BranchManager.DEFAULT_MAIN_BRANCH;

private Map<String, String> staticPartition;

Expand All @@ -56,16 +58,23 @@ public BatchWriteBuilder withOverwrite(@Nullable Map<String, String> staticParti
return this;
}

@Override
public BatchWriteBuilder toBranch(String branch) {
this.branch = branch;
return this;
}

@Override
public BatchTableWrite newWrite() {
return table.newWrite(commitUser)
return table.newWrite(commitUser, branch)
.withIgnorePreviousFiles(staticPartition != null)
.withExecutionMode(false);
}

@Override
public BatchTableCommit newCommit() {
InnerTableCommit commit = table.newCommit(commitUser).withOverwrite(staticPartition);
InnerTableCommit commit =
table.newCommit(commitUser, branch).withOverwrite(staticPartition);
commit.ignoreEmptyCommit(true);
return commit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public interface StreamWriteBuilder extends WriteBuilder {
*/
StreamWriteBuilder withCommitUser(String commitUser);

StreamWriteBuilder toBranch(String branch);

/** Create a {@link TableWrite} to write {@link InternalRow}s. */
@Override
StreamTableWrite newWrite();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.table.InnerTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;

import java.util.UUID;

Expand All @@ -31,6 +32,7 @@ public class StreamWriteBuilderImpl implements StreamWriteBuilder {
private final InnerTable table;

private String commitUser = UUID.randomUUID().toString();
private String branch = BranchManager.DEFAULT_MAIN_BRANCH;

public StreamWriteBuilderImpl(InnerTable table) {
this.table = table;
Expand All @@ -57,13 +59,19 @@ public StreamWriteBuilder withCommitUser(String commitUser) {
return this;
}

@Override
public StreamWriteBuilder toBranch(String branch) {
this.branch = branch;
return this;
}

@Override
public StreamTableWrite newWrite() {
return table.newWrite(commitUser);
return table.newWrite(commitUser, branch);
}

@Override
public StreamTableCommit newCommit() {
return table.newCommit(commitUser).ignoreEmptyCommit(false);
return table.newCommit(commitUser, branch).ignoreEmptyCommit(false);
}
}

0 comments on commit 3924e8b

Please sign in to comment.