Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add branch test
Browse files Browse the repository at this point in the history
sunxiaojian committed Mar 13, 2024
1 parent ace8c7c commit ee1a0ab
Showing 22 changed files with 1,564 additions and 208 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
@@ -86,5 +86,11 @@
<td>String</td>
<td>The warehouse root path of catalog.</td>
</tr>
<tr>
<td><h5>branch</h5></td>
<td style="word-wrap: break-word;">main</td>
<td>String</td>
<td>Specify the branch submitted by the schema.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
@@ -105,6 +105,6 @@ public class CatalogOptions {
"\"custom\": You can implement LineageMetaFactory and LineageMeta to store lineage information in customized storage."))
.build());

public static final ConfigOption<String> BRANCH_NAME =
key("branch-name").stringType().defaultValue("main").withDescription("branch name");
public static final ConfigOption<String> BRANCH =
key("branch").stringType().defaultValue("main").withDescription("Specify the branch submitted by the schema.");
}
Original file line number Diff line number Diff line change
@@ -187,7 +187,7 @@ public FileStoreCommitImpl newCommit(String commitUser, String branchName) {
manifestFileFactory(),
manifestListFactory(),
indexManifestFileFactory(),
newScan(),
newScan(branchName),
options.bucket(),
options.manifestTargetSize(),
options.manifestFullCompactionThresholdSize(),
Original file line number Diff line number Diff line change
@@ -50,7 +50,7 @@
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.paimon.options.CatalogOptions.BRANCH_NAME;
import static org.apache.paimon.options.CatalogOptions.BRANCH;
import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -84,7 +84,7 @@ protected AbstractCatalog(FileIO fileIO, Options options) {
this.tableDefaultOptions =
convertToPropertiesPrefixKey(options.toMap(), TABLE_DEFAULT_OPTION_PREFIX);
this.catalogOptions = options;
this.branchName = options.get(BRANCH_NAME);
this.branchName = options.get(BRANCH);
}

@Override
Original file line number Diff line number Diff line change
@@ -157,7 +157,7 @@ public FileStoreScan withPartitionBucket(BinaryRow partition, int bucket) {
@Override
public FileStoreScan withSnapshot(long snapshotId) {
checkState(specifiedManifests == null, "Cannot set both snapshot and manifests.");
this.specifiedSnapshot = snapshotManager.snapshot(snapshotId);
this.specifiedSnapshot = snapshotManager.snapshot(branchName, snapshotId);
return this;
}

@@ -406,8 +406,8 @@ private List<ManifestFileMeta> readManifests(Snapshot snapshot) {
// ------------------------------------------------------------------------

/** Note: Keep this thread-safe. */
protected TableSchema scanTableSchema(long id) {
return tableSchemas.computeIfAbsent(id, key -> schemaManager.schema(id));
protected TableSchema scanTableSchema(long id, String branch) {
return tableSchemas.computeIfAbsent(id, key -> schemaManager.schema(branch, id));
}

/** Note: Keep this thread-safe. */
Original file line number Diff line number Diff line change
@@ -63,7 +63,8 @@ public AppendOnlyFileStoreScan(
scanManifestParallelism,
branchName);
this.fieldStatsConverters =
new FieldStatsConverters(sid -> scanTableSchema(sid).fields(), schemaId);
new FieldStatsConverters(
sid -> scanTableSchema(sid, branchName).fields(), schemaId);
}

public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
Original file line number Diff line number Diff line change
@@ -70,10 +70,14 @@ public KeyValueFileStoreScan(
branchName);
this.fieldKeyStatsConverters =
new FieldStatsConverters(
sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid)), schemaId);
sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid, branchName)),
schemaId);
this.fieldValueStatsConverters =
new FieldStatsConverters(
sid -> keyValueFieldsExtractor.valueFields(scanTableSchema(sid)), schemaId);
sid ->
keyValueFieldsExtractor.valueFields(
scanTableSchema(sid, branchName)),
schemaId);
}

public KeyValueFileStoreScan withKeyFilter(Predicate predicate) {
Original file line number Diff line number Diff line change
@@ -56,7 +56,6 @@
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.FileStorePathFactory;
@@ -93,47 +92,6 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
private final RowType keyType;
private final RowType valueType;

public KeyValueFileStoreWrite(
FileIO fileIO,
SchemaManager schemaManager,
long schemaId,
String commitUser,
RowType keyType,
RowType valueType,
Supplier<Comparator<InternalRow>> keyComparatorSupplier,
Supplier<FieldsComparator> udsComparatorSupplier,
Supplier<RecordEqualiser> valueEqualiserSupplier,
MergeFunctionFactory<KeyValue> mfFactory,
FileStorePathFactory pathFactory,
Map<String, FileStorePathFactory> format2PathFactory,
SnapshotManager snapshotManager,
FileStoreScan scan,
@Nullable IndexMaintainer.Factory<KeyValue> indexFactory,
CoreOptions options,
KeyValueFieldsExtractor extractor,
String tableName) {
this(
fileIO,
schemaManager,
schemaId,
commitUser,
keyType,
valueType,
keyComparatorSupplier,
udsComparatorSupplier,
valueEqualiserSupplier,
mfFactory,
pathFactory,
format2PathFactory,
snapshotManager,
scan,
indexFactory,
options,
extractor,
tableName,
BranchManager.DEFAULT_MAIN_BRANCH);
}

public KeyValueFileStoreWrite(
FileIO fileIO,
SchemaManager schemaManager,
@@ -169,7 +127,8 @@ public KeyValueFileStoreWrite(
FileFormatDiscover.of(options),
pathFactory,
extractor,
options);
options,
branchName);
this.writerFactoryBuilder =
KeyValueFileWriterFactory.builder(
fileIO,
Original file line number Diff line number Diff line change
@@ -99,7 +99,7 @@ public Optional<TableSchema> latest(String branchName) {
try {
return listVersionedFiles(fileIO, schemaDirectory(branchName), SCHEMA_PREFIX)
.reduce(Math::max)
.map(this::schema);
.map(id -> schema(branchName, id));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Original file line number Diff line number Diff line change
@@ -127,6 +127,7 @@ public RecordReader<InternalRow> reader(Split split) throws IOException {
};
}

@Override
public InnerTableRead newRead(String branch) {
AppendOnlyFileStoreRead read = store().newRead(branch);
return new AbstractDataTableRead<InternalRow>(read, schema()) {
Original file line number Diff line number Diff line change
@@ -40,7 +40,6 @@ public abstract class AbstractDataTableRead<T> implements InnerTableRead {
private int[][] projection;
private boolean executeFilter = false;
private Predicate predicate;
private String branch;

public AbstractDataTableRead(FileStoreRead<T> fileStoreRead, TableSchema schema) {
this.fileStoreRead = fileStoreRead;
Original file line number Diff line number Diff line change
@@ -122,8 +122,8 @@ default ReadBuilder withProjection(int[] projection) {
/** the row number pushed down. */
ReadBuilder withLimit(int limit);

/** the row number pushed down. */
ReadBuilder withBranch(String branch);
/** which branch to read from. */
ReadBuilder fromBranch(String branch);

/** Create a {@link TableScan} to perform batch planning. */
TableScan newScan();
Original file line number Diff line number Diff line change
@@ -43,6 +43,8 @@ public class ReadBuilderImpl implements ReadBuilder {
private Integer limit = null;

private Map<String, String> partitionSpec;

// default read main branch.
private String branch = BranchManager.DEFAULT_MAIN_BRANCH;

public ReadBuilderImpl(InnerTable table) {
@@ -91,7 +93,7 @@ public ReadBuilder withLimit(int limit) {
}

@Override
public ReadBuilder withBranch(String branch) {
public ReadBuilder fromBranch(String branch) {
this.branch = branch;
return this;
}
Loading

0 comments on commit ee1a0ab

Please sign in to comment.