Skip to content

Commit

Permalink
flink support write/read branch
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 12, 2024
1 parent 436d61b commit 7fb6cba
Show file tree
Hide file tree
Showing 85 changed files with 763 additions and 220 deletions.
17 changes: 0 additions & 17 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,6 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("The file path of this table in the filesystem.");

public static final ConfigOption<String> BRANCH_NAME =
key("branch-name")
.stringType()
.noDefaultValue()
.withDescription("Specify branch name.");

public static final ConfigOption<FileFormatType> FILE_FORMAT =
key("file.format")
.enumType(FileFormatType.class)
Expand Down Expand Up @@ -1111,17 +1105,6 @@ public Path path() {
return path(options.toMap());
}

public String branch() {
return branch(options.toMap());
}

public static String branch(Map<String, String> options) {
if (options.containsKey(BRANCH_NAME.key())) {
return options.get(BRANCH_NAME.key());
}
return "main";
}

public static Path path(Map<String, String> options) {
return new Path(options.get(PATH.key()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.operation.AppendOnlyFileStoreRead;
import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.operation.ScanBucketFilter;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
Expand Down Expand Up @@ -88,14 +89,37 @@ public AppendOnlyFileStoreRead newRead() {
pathFactory());
}

@Override
public AppendOnlyFileStoreRead newRead(String branchName) {
return new AppendOnlyFileStoreRead(
fileIO,
schemaManager,
schemaId,
rowType,
FileFormatDiscover.of(options),
pathFactory(),
branchName);
}

@Override
public AppendOnlyFileStoreWrite newWrite(String commitUser) {
return newWrite(commitUser, null);
return newWrite(commitUser, null, DEFAULT_MAIN_BRANCH);
}

@Override
public FileStoreWrite<InternalRow> newWrite(String commitUser, String branch) {
return newWrite(commitUser, null, branch);
}

@Override
public AppendOnlyFileStoreWrite newWrite(
String commitUser, ManifestCacheFilter manifestFilter) {
return newWrite(commitUser, manifestFilter, DEFAULT_MAIN_BRANCH);
}

@Override
public AppendOnlyFileStoreWrite newWrite(
String commitUser, ManifestCacheFilter manifestFilter, String branchName) {
return new AppendOnlyFileStoreWrite(
fileIO,
newRead(),
Expand All @@ -104,9 +128,10 @@ public AppendOnlyFileStoreWrite newWrite(
rowType,
pathFactory(),
snapshotManager(),
newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
newScan(true, branchName).withManifestCacheFilter(manifestFilter),
options,
tableName);
tableName,
branchName);
}

private AppendOnlyFileStoreScan newScan(boolean forWrite, String branchName) {
Expand Down
7 changes: 7 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,17 @@ public interface FileStore<T> extends Serializable {

FileStoreRead<T> newRead();

FileStoreRead<T> newRead(String branchName);

FileStoreWrite<T> newWrite(String commitUser);

FileStoreWrite<T> newWrite(String commitUser, String branch);

FileStoreWrite<T> newWrite(String commitUser, ManifestCacheFilter manifestFilter);

FileStoreWrite<T> newWrite(
String commitUser, ManifestCacheFilter manifestFilter, String branchName);

FileStoreCommit newCommit(String commitUser);

FileStoreCommit newCommit(String commitUser, String branchName);
Expand Down
46 changes: 43 additions & 3 deletions paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,20 @@ public KeyValueFileStoreRead newRead() {
newReaderFactoryBuilder());
}

@Override
public KeyValueFileStoreRead newRead(String branchName) {
return new KeyValueFileStoreRead(
schemaManager,
schemaId,
keyType,
valueType,
newKeyComparator(),
userDefinedSeqComparator(),
mfFactory,
newReaderFactoryBuilder(branchName),
branchName);
}

public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
return KeyValueFileReaderFactory.builder(
fileIO,
Expand All @@ -145,18 +159,43 @@ public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
options);
}

public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder(String branch) {
return KeyValueFileReaderFactory.builder(
fileIO,
schemaManager,
schemaId,
keyType,
valueType,
FileFormatDiscover.of(options),
pathFactory(),
keyValueFieldsExtractor,
options,
branch);
}

@Nullable
private FieldsComparator userDefinedSeqComparator() {
return UserDefinedSeqComparator.create(valueType, options);
}

@Override
public KeyValueFileStoreWrite newWrite(String commitUser) {
return newWrite(commitUser, null);
return newWrite(commitUser, null, DEFAULT_MAIN_BRANCH);
}

@Override
public KeyValueFileStoreWrite newWrite(String commitUser, String branch) {
return newWrite(commitUser, null, branch);
}

@Override
public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter manifestFilter) {
return newWrite(commitUser, manifestFilter, DEFAULT_MAIN_BRANCH);
}

@Override
public KeyValueFileStoreWrite newWrite(
String commitUser, ManifestCacheFilter manifestFilter, String branchName) {
IndexMaintainer.Factory<KeyValue> indexFactory = null;
if (bucketMode() == BucketMode.DYNAMIC) {
indexFactory = new HashIndexMaintainer.Factory(newIndexFileHandler());
Expand All @@ -175,11 +214,12 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
pathFactory(),
format2PathFactory(),
snapshotManager(),
newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
newScan(true, branchName).withManifestCacheFilter(manifestFilter),
indexFactory,
options,
keyValueFieldsExtractor,
tableName);
tableName,
branchName);
}

private Map<String, FileStorePathFactory> format2PathFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.AsyncRecordReader;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.BulkFormatMapping;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Projection;
Expand Down Expand Up @@ -59,6 +60,7 @@ public class KeyValueFileReaderFactory {

private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
private final BinaryRow partition;
private final String branch;

private KeyValueFileReaderFactory(
FileIO fileIO,
Expand All @@ -69,7 +71,8 @@ private KeyValueFileReaderFactory(
BulkFormatMapping.BulkFormatMappingBuilder bulkFormatMappingBuilder,
DataFilePathFactory pathFactory,
long asyncThreshold,
BinaryRow partition) {
BinaryRow partition,
String branch) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schemaId = schemaId;
Expand All @@ -80,6 +83,7 @@ private KeyValueFileReaderFactory(
this.asyncThreshold = asyncThreshold;
this.partition = partition;
this.bulkFormatMappings = new HashMap<>();
this.branch = branch;
}

public RecordReader<KeyValue> createRecordReader(
Expand All @@ -104,8 +108,8 @@ private RecordReader<KeyValue> createRecordReader(
() ->
bulkFormatMappingBuilder.build(
formatIdentifier,
schemaManager.schema(this.schemaId),
schemaManager.schema(schemaId));
schemaManager.schema(branch, this.schemaId),
schemaManager.schema(branch, schemaId));

BulkFormatMapping bulkFormatMapping =
reuseFormat
Expand Down Expand Up @@ -136,6 +140,30 @@ public static Builder builder(
FileStorePathFactory pathFactory,
KeyValueFieldsExtractor extractor,
CoreOptions options) {
return builder(
fileIO,
schemaManager,
schemaId,
keyType,
valueType,
formatDiscover,
pathFactory,
extractor,
options,
BranchManager.DEFAULT_MAIN_BRANCH);
}

public static Builder builder(
FileIO fileIO,
SchemaManager schemaManager,
long schemaId,
RowType keyType,
RowType valueType,
FileFormatDiscover formatDiscover,
FileStorePathFactory pathFactory,
KeyValueFieldsExtractor extractor,
CoreOptions options,
String branch) {
return new Builder(
fileIO,
schemaManager,
Expand All @@ -145,7 +173,8 @@ public static Builder builder(
formatDiscover,
pathFactory,
extractor,
options);
options,
branch);
}

/** Builder for {@link KeyValueFileReaderFactory}. */
Expand All @@ -166,6 +195,7 @@ public static class Builder {
private int[][] valueProjection;
private RowType projectedKeyType;
private RowType projectedValueType;
private String branch;

private Builder(
FileIO fileIO,
Expand All @@ -176,7 +206,8 @@ private Builder(
FileFormatDiscover formatDiscover,
FileStorePathFactory pathFactory,
KeyValueFieldsExtractor extractor,
CoreOptions options) {
CoreOptions options,
String branch) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schemaId = schemaId;
Expand All @@ -190,6 +221,7 @@ private Builder(
this.options = options;
this.keyProjection = fullKeyProjection;
this.valueProjection = Projection.range(0, valueType.getFieldCount()).toNestedIndexes();
this.branch = branch;
applyProjection();
}

Expand All @@ -203,7 +235,8 @@ public Builder copyWithoutProjection() {
formatDiscover,
pathFactory,
extractor,
options);
options,
branch);
}

public Builder withKeyProjection(int[][] projection) {
Expand Down Expand Up @@ -248,7 +281,8 @@ public KeyValueFileReaderFactory build(
formatDiscover, extractor, keyProjection, valueProjection, filters),
pathFactory.createDataFilePathFactory(partition, bucket),
options.fileReaderAsyncThreshold().getBytes(),
partition);
partition,
branch);
}

private void applyProjection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,16 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {

protected CompactionMetrics compactionMetrics = null;
protected final String tableName;
private final String branchName;

protected AbstractFileStoreWrite(
String commitUser,
SnapshotManager snapshotManager,
FileStoreScan scan,
@Nullable IndexMaintainer.Factory<T> indexFactory,
String tableName,
int writerNumberMax) {
int writerNumberMax,
String branchName) {
this.commitUser = commitUser;
this.snapshotManager = snapshotManager;
this.scan = scan;
Expand All @@ -93,6 +95,7 @@ protected AbstractFileStoreWrite(
this.writers = new HashMap<>();
this.tableName = tableName;
this.writerNumberMax = writerNumberMax;
this.branchName = branchName;
}

@Override
Expand Down Expand Up @@ -171,7 +174,7 @@ public List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIden
} else {
latestCommittedIdentifier =
snapshotManager
.latestSnapshotOfUser(commitUser)
.latestSnapshotOfUser(branchName, commitUser)
.map(Snapshot::commitIdentifier)
.orElse(Long.MIN_VALUE);
}
Expand Down Expand Up @@ -350,7 +353,7 @@ public WriterContainer<T> createWriterContainer(
}
}

Long latestSnapshotId = snapshotManager.latestSnapshotId();
Long latestSnapshotId = snapshotManager.latestSnapshotId(branchName);
List<DataFileMeta> restoreFiles = new ArrayList<>();
if (!ignorePreviousFiles && latestSnapshotId != null) {
restoreFiles = scanExistingFileMetas(latestSnapshotId, partition, bucket);
Expand Down
Loading

0 comments on commit 7fb6cba

Please sign in to comment.