Skip to content

Commit

Permalink
format code
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 15, 2024
1 parent ee1a0ab commit 5e454ef
Show file tree
Hide file tree
Showing 37 changed files with 756 additions and 850 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,8 @@ public class CatalogOptions {
.build());

public static final ConfigOption<String> BRANCH =
key("branch").stringType().defaultValue("main").withDescription("Specify the branch submitted by the schema.");
key("branch")
.stringType()
.defaultValue("main")
.withDescription("Specify the branch submitted by the schema.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;

/**
* Default implementation of {@link FileStoreCommit}.
*
Expand Down Expand Up @@ -185,7 +183,8 @@ public Set<Long> filterCommitted(Set<Long> commitIdentifiers) {
return commitIdentifiers;
}

Optional<Snapshot> latestSnapshot = snapshotManager.latestSnapshotOfUser(commitUser);
Optional<Snapshot> latestSnapshot =
snapshotManager.latestSnapshotOfUser(branchName, commitUser);
if (latestSnapshot.isPresent()) {
Set<Long> result = new HashSet<>();
for (Long identifier : commitIdentifiers) {
Expand Down Expand Up @@ -643,7 +642,7 @@ private int tryOverwrite(
Map<Integer, Long> logOffsets) {
int cnt = 0;
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
Snapshot latestSnapshot = snapshotManager.latestSnapshot(branchName);

cnt++;
List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
Expand Down Expand Up @@ -711,10 +710,7 @@ public boolean tryCommitOnce(
@Nullable String newStatsFileName) {
long newSnapshotId =
latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshot.id() + 1;
Path newSnapshotPath =
branchName.equals(DEFAULT_MAIN_BRANCH)
? snapshotManager.snapshotPath(newSnapshotId)
: snapshotManager.snapshotPath(branchName, newSnapshotId);
Path newSnapshotPath = snapshotManager.snapshotPath(branchName, newSnapshotId);

if (LOG.isDebugEnabled()) {
LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ public TableSchema commitChanges(String branchName, List<SchemaChange> changes)
Catalog.ColumnNotExistException {
while (true) {
TableSchema schema =
latest().orElseThrow(
latest(branchName)
.orElseThrow(
() ->
new Catalog.TableNotExistException(
fromPath(tableRoot.toString(), true)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor;
import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
import org.apache.paimon.table.sink.InnerTableWrite;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.sink.UnawareBucketRowKeyExtractor;
import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.InnerStreamTableScanImpl;
Expand Down Expand Up @@ -184,7 +186,8 @@ public InnerStreamTableScan newStreamScan(String branch) {
newSnapshotReader(branch),
snapshotManager(),
supportStreamingReadOverwrite(),
DefaultValueAssigner.create(tableSchema));
DefaultValueAssigner.create(tableSchema),
branch);
}

protected abstract SplitGenerator splitGenerator();
Expand Down Expand Up @@ -333,6 +336,16 @@ public TableCommitImpl newCommit(String commitUser) {
return newCommit(commitUser, DEFAULT_MAIN_BRANCH);
}

@Override
public TableWriteImpl<?> newWrite(String commitUser) {
return newWrite(commitUser, null, BranchManager.DEFAULT_MAIN_BRANCH);
}

@Override
public InnerTableWrite newWrite(String commitUser, String branch) {
return newWrite(commitUser, null, branch);
}

public TableCommitImpl newCommit(String commitUser, String branchName) {
CoreOptions options = coreOptions();
Runnable snapshotExpire = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.InnerTableWrite;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.AbstractDataTableRead;
import org.apache.paimon.table.source.AppendOnlySplitGenerator;
Expand Down Expand Up @@ -145,7 +146,12 @@ public RecordReader<InternalRow> reader(Split split) throws IOException {

@Override
public TableWriteImpl<InternalRow> newWrite(String commitUser) {
return newWrite(commitUser, null);
return newWrite(commitUser, null, BranchManager.DEFAULT_MAIN_BRANCH);
}

@Override
public InnerTableWrite newWrite(String commitUser, String branch) {
return newWrite(commitUser, null, branch);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@ default InnerTableRead newRead(String branch) {

InnerTableWrite newWrite(String commitUser);

InnerTableWrite newWrite(String commitUser, String branch);

InnerTableCommit newCommit(String commitUser);

InnerTableCommit newCommit(String commitUser, String branch);

@Override
default ReadBuilder newReadBuilder() {
return new ReadBuilderImpl(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public InnerTableRead forceKeepDelete() {

@Override
public TableWriteImpl<KeyValue> newWrite(String commitUser) {
return newWrite(commitUser, null);
return newWrite(commitUser, null, DEFAULT_MAIN_BRANCH);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ default InnerTableWrite newWrite(String commitUser) {
this.getClass().getSimpleName()));
}

@Override
default InnerTableWrite newWrite(String commitUser, String branch) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support newWrite with branch.",
this.getClass().getSimpleName()));
}

@Override
default InnerTableCommit newCommit(String commitUser) {
throw new UnsupportedOperationException(
Expand All @@ -85,6 +93,14 @@ default InnerTableCommit newCommit(String commitUser) {
this.getClass().getSimpleName()));
}

@Override
default InnerTableCommit newCommit(String commitUser, String branch) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support newCommit with branch.",
this.getClass().getSimpleName()));
}

@Override
default InnerStreamTableScan newStreamScan() {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ default BatchWriteBuilder withOverwrite() {
return this;
}

BatchWriteBuilder toBranch(String branch);

/** Overwrite writing, same as the 'INSERT OVERWRITE T PARTITION (...)' semantics of SQL. */
BatchWriteBuilder withOverwrite(@Nullable Map<String, String> staticPartition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.table.InnerTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;

import javax.annotation.Nullable;

Expand All @@ -33,6 +34,7 @@ public class BatchWriteBuilderImpl implements BatchWriteBuilder {

private final InnerTable table;
private final String commitUser = UUID.randomUUID().toString();
private String branch = BranchManager.DEFAULT_MAIN_BRANCH;

private Map<String, String> staticPartition;

Expand All @@ -56,16 +58,23 @@ public BatchWriteBuilder withOverwrite(@Nullable Map<String, String> staticParti
return this;
}

@Override
public BatchWriteBuilder toBranch(String branch) {
this.branch = branch;
return this;
}

@Override
public BatchTableWrite newWrite() {
return table.newWrite(commitUser)
return table.newWrite(commitUser, branch)
.withIgnorePreviousFiles(staticPartition != null)
.withExecutionMode(false);
}

@Override
public BatchTableCommit newCommit() {
InnerTableCommit commit = table.newCommit(commitUser).withOverwrite(staticPartition);
InnerTableCommit commit =
table.newCommit(commitUser, branch).withOverwrite(staticPartition);
commit.ignoreEmptyCommit(true);
return commit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public interface StreamWriteBuilder extends WriteBuilder {
*/
StreamWriteBuilder withCommitUser(String commitUser);

StreamWriteBuilder toBranch(String branch);

/** Create a {@link TableWrite} to write {@link InternalRow}s. */
@Override
StreamTableWrite newWrite();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.table.InnerTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;

import java.util.UUID;

Expand All @@ -31,6 +32,7 @@ public class StreamWriteBuilderImpl implements StreamWriteBuilder {
private final InnerTable table;

private String commitUser = UUID.randomUUID().toString();
private String branch = BranchManager.DEFAULT_MAIN_BRANCH;

public StreamWriteBuilderImpl(InnerTable table) {
this.table = table;
Expand All @@ -57,13 +59,19 @@ public StreamWriteBuilder withCommitUser(String commitUser) {
return this;
}

@Override
public StreamWriteBuilder toBranch(String branch) {
this.branch = branch;
return this;
}

@Override
public StreamTableWrite newWrite() {
return table.newWrite(commitUser);
return table.newWrite(commitUser, branch);
}

@Override
public StreamTableCommit newCommit() {
return table.newCommit(commitUser).ignoreEmptyCommit(false);
return table.newCommit(commitUser, branch).ignoreEmptyCommit(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,36 +79,31 @@ public Path snapshotDirectory() {
}

public Path snapshotDirectory(String branchName) {
return new Path(getBranchPath(tablePath, branchName) + "/snapshot");
}

public Path snapshotDirectoryByBranch(String branchName) {
return branchName.equals(DEFAULT_MAIN_BRANCH)
? snapshotDirectory()
: snapshotDirectory(branchName);
: new Path(getBranchPath(tablePath, branchName) + "/snapshot");
}

public Path snapshotPath(long snapshotId) {
return new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId);
}

public Path snapshotPath(String branchName, long snapshotId) {
return new Path(
getBranchPath(tablePath, branchName) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId);
}

public Path snapshotPathByBranch(String branchName, long snapshotId) {
return branchName.equals(DEFAULT_MAIN_BRANCH)
? snapshotPath(snapshotId)
: snapshotPath(branchName, snapshotId);
: new Path(
getBranchPath(tablePath, branchName)
+ "/snapshot/"
+ SNAPSHOT_PREFIX
+ snapshotId);
}

public Snapshot snapshot(long snapshotId) {
return snapshot(DEFAULT_MAIN_BRANCH, snapshotId);
}

public Snapshot snapshot(String branchName, long snapshotId) {
Path snapshotPath = snapshotPathByBranch(branchName, snapshotId);
Path snapshotPath = snapshotPath(branchName, snapshotId);
return Snapshot.fromPath(fileIO, snapshotPath);
}

Expand All @@ -117,7 +112,7 @@ public boolean snapshotExists(long snapshotId) {
}

public boolean snapshotExists(String branchName, long snapshotId) {
Path path = snapshotPathByBranch(branchName, snapshotId);
Path path = snapshotPath(branchName, snapshotId);
try {
return fileIO.exists(path);
} catch (IOException e) {
Expand Down Expand Up @@ -265,8 +260,7 @@ public long snapshotCount() throws IOException {
}

public long snapshotCount(String branch) throws IOException {
return listVersionedFiles(fileIO, snapshotDirectoryByBranch(branch), SNAPSHOT_PREFIX)
.count();
return listVersionedFiles(fileIO, snapshotDirectory(branch), SNAPSHOT_PREFIX).count();
}

public Iterator<Snapshot> snapshots() throws IOException {
Expand Down Expand Up @@ -429,7 +423,7 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter<Snapshot> checker) {
}

private @Nullable Long findLatest(String branchName) throws IOException {
Path snapshotDir = snapshotDirectoryByBranch(branchName);
Path snapshotDir = snapshotDirectory(branchName);
if (!fileIO.exists(snapshotDir)) {
return null;
}
Expand All @@ -447,7 +441,7 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter<Snapshot> checker) {
}

private @Nullable Long findEarliest(String branchName) throws IOException {
Path snapshotDir = snapshotDirectoryByBranch(branchName);
Path snapshotDir = snapshotDirectory(branchName);
if (!fileIO.exists(snapshotDir)) {
return null;
}
Expand All @@ -466,7 +460,7 @@ public Long readHint(String fileName) {
}

public Long readHint(String fileName, String branchName) {
Path snapshotDir = snapshotDirectoryByBranch(branchName);
Path snapshotDir = snapshotDirectory(branchName);
Path path = new Path(snapshotDir, fileName);
int retryNumber = 0;
while (retryNumber++ < READ_HINT_RETRY_NUM) {
Expand All @@ -486,7 +480,7 @@ public Long readHint(String fileName, String branchName) {

private Long findByListFiles(BinaryOperator<Long> reducer, String branchName)
throws IOException {
Path snapshotDir = snapshotDirectoryByBranch(branchName);
Path snapshotDir = snapshotDirectory(branchName);
return listVersionedFiles(fileIO, snapshotDir, SNAPSHOT_PREFIX)
.reduce(reducer)
.orElse(null);
Expand All @@ -510,7 +504,7 @@ public void commitEarliestHint(long snapshotId, String branchName) throws IOExce

private void commitHint(long snapshotId, String fileName, String branchName)
throws IOException {
Path snapshotDir = snapshotDirectoryByBranch(branchName);
Path snapshotDir = snapshotDirectory(branchName);
Path hintFile = new Path(snapshotDir, fileName);
fileIO.overwriteFileUtf8(hintFile, String.valueOf(snapshotId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ public DataStreamSink<?> sinkFrom(
protected OneInputStreamOperator<CdcMultiplexRecord, MultiTableCommittable> createWriteOperator(
StoreSinkWrite.WithWriteBufferProvider writeProvider, String commitUser) {
return new CdcRecordStoreMultiWriteOperator(
catalogLoader, writeProvider, commitUser, new Options());
catalogLoader, writeProvider, commitUser, new Options())
.toBranch(branch);
}

// Table committers are dynamically created at runtime
Expand All @@ -166,7 +167,8 @@ protected OneInputStreamOperator<CdcMultiplexRecord, MultiTableCommittable> crea
// commit new files list even if they're empty.
// Otherwise we can't tell if the commit is successful after
// a restart.
return (user, metricGroup) -> new StoreMultiCommitter(catalogLoader, user, metricGroup);
return (user, metricGroup) ->
new StoreMultiCommitter(catalogLoader, user, metricGroup, branch);
}

protected CommittableStateManager<WrappedManifestCommittable> createCommittableStateManager() {
Expand Down
Loading

0 comments on commit 5e454ef

Please sign in to comment.