Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
newKeyComparator(),
options.branch(),
newStatsFileHandler(),
bucketMode(),
options.scanManifestParallelism(),
createCommitCallbacks(commitUser, table),
options.commitMaxRetries(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@
import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.utils.Pair;

import javax.annotation.Nullable;

import java.util.Optional;

/** Deletion File from compaction. */
public interface CompactDeletionFile {

Optional<IndexFileMeta> getOrCompute();
/**
* Get or compute the deletion file.
*
* @return Pair of deleted deletion file and new deletion file.
*/
Pair<IndexFileMeta, IndexFileMeta> getOrCompute();

CompactDeletionFile mergeOldFile(CompactDeletionFile old);

Expand All @@ -41,8 +45,8 @@ public interface CompactDeletionFile {
* them).
*/
static CompactDeletionFile generateFiles(BucketedDvMaintainer maintainer) {
Optional<IndexFileMeta> file = maintainer.writeDeletionVectorsIndex();
return new GeneratedDeletionFile(file.orElse(null), maintainer.dvIndexFile());
Pair<IndexFileMeta, IndexFileMeta> pair = maintainer.writeDeletionVectorsIndex();
return new GeneratedDeletionFile(pair.getLeft(), pair.getRight(), maintainer.dvIndexFile());
}

/** For sync compaction, only create deletion files when prepareCommit. */
Expand All @@ -53,21 +57,25 @@ static CompactDeletionFile lazyGeneration(BucketedDvMaintainer maintainer) {
/** A generated files implementation of {@link CompactDeletionFile}. */
class GeneratedDeletionFile implements CompactDeletionFile {

@Nullable private final IndexFileMeta deletionFile;
@Nullable private IndexFileMeta deleteDeletionFile;
@Nullable private final IndexFileMeta newDeletionFile;
private final DeletionVectorsIndexFile dvIndexFile;

private boolean getInvoked = false;

public GeneratedDeletionFile(
@Nullable IndexFileMeta deletionFile, DeletionVectorsIndexFile dvIndexFile) {
this.deletionFile = deletionFile;
@Nullable IndexFileMeta deletedDeletionFile,
@Nullable IndexFileMeta newDeletionFile,
DeletionVectorsIndexFile dvIndexFile) {
this.deleteDeletionFile = deletedDeletionFile;
this.newDeletionFile = newDeletionFile;
this.dvIndexFile = dvIndexFile;
}

@Override
public Optional<IndexFileMeta> getOrCompute() {
public Pair<IndexFileMeta, IndexFileMeta> getOrCompute() {
this.getInvoked = true;
return Optional.ofNullable(deletionFile);
return Pair.of(deleteDeletionFile, newDeletionFile);
}

@Override
Expand All @@ -81,18 +89,20 @@ public CompactDeletionFile mergeOldFile(CompactDeletionFile old) {
throw new IllegalStateException("old should not be get, this is a bug.");
}

if (deletionFile == null) {
if (newDeletionFile == null) {
return old;
}

old.clean();
// Keep the old deletion file.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we should keep the old deletion file

deleteDeletionFile = ((GeneratedDeletionFile) old).deleteDeletionFile;
return this;
}

@Override
public void clean() {
if (deletionFile != null) {
dvIndexFile.delete(deletionFile);
if (newDeletionFile != null) {
dvIndexFile.delete(newDeletionFile);
}
}
}
Expand All @@ -109,7 +119,7 @@ public LazyCompactDeletionFile(BucketedDvMaintainer maintainer) {
}

@Override
public Optional<IndexFileMeta> getOrCompute() {
public Pair<IndexFileMeta, IndexFileMeta> getOrCompute() {
generated = true;
return generateFiles(maintainer).getOrCompute();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.utils.Pair;

import javax.annotation.Nullable;

Expand All @@ -38,13 +39,17 @@ public class BucketedDvMaintainer {
private final Map<String, DeletionVector> deletionVectors;
protected final boolean bitmap64;
private boolean modified;
@Nullable private IndexFileMeta beforeDvFile;

private BucketedDvMaintainer(
DeletionVectorsIndexFile dvIndexFile, Map<String, DeletionVector> deletionVectors) {
DeletionVectorsIndexFile dvIndexFile,
Map<String, DeletionVector> deletionVectors,
@Nullable IndexFileMeta restoredDvFile) {
this.dvIndexFile = dvIndexFile;
this.deletionVectors = deletionVectors;
this.bitmap64 = dvIndexFile.bitmap64();
this.modified = false;
this.beforeDvFile = restoredDvFile;
}

private DeletionVector createNewDeletionVector() {
Expand Down Expand Up @@ -107,17 +112,18 @@ public void removeDeletionVectorOf(String fileName) {
}

/**
* Write new deletion vectors index file if any modifications have been made.
*
* @return None if no modifications have been made, otherwise the new deletion vectors index
* file.
* Write before and new deletion vectors index file pair if any modifications have been made.
*/
public Optional<IndexFileMeta> writeDeletionVectorsIndex() {
public Pair<IndexFileMeta, IndexFileMeta> writeDeletionVectorsIndex() {
if (modified) {
modified = false;
return Optional.of(dvIndexFile.writeSingleFile(deletionVectors));
IndexFileMeta newIndexFile = dvIndexFile.writeSingleFile(deletionVectors);
IndexFileMeta toRemove = beforeDvFile;
beforeDvFile = newIndexFile;
return Pair.of(toRemove, newIndexFile);
} else {
return Pair.of(null, null);
}
return Optional.empty();
}

/**
Expand Down Expand Up @@ -168,12 +174,13 @@ public BucketedDvMaintainer create(
}
Map<String, DeletionVector> deletionVectors =
new HashMap<>(handler.readAllDeletionVectors(partition, bucket, restoredFiles));
return create(partition, bucket, deletionVectors);
}

public BucketedDvMaintainer create(
BinaryRow partition, int bucket, Map<String, DeletionVector> deletionVectors) {
return new BucketedDvMaintainer(handler.dvIndex(partition, bucket), deletionVectors);
if (restoredFiles.size() > 1) {
throw new UnsupportedOperationException(
"For bucket table, one bucket should only have one dv file at most.");
}
IndexFileMeta fileMeta = restoredFiles.isEmpty() ? null : restoredFiles.get(0);
return new BucketedDvMaintainer(
handler.dvIndex(partition, bucket), deletionVectors, fileMeta);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.utils.Pair;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -59,10 +61,13 @@ public void notifyNewDeletionVector(String dataFile, DeletionVector deletionVect
@Override
public List<IndexManifestEntry> persist() {
List<IndexManifestEntry> result = new ArrayList<>();
maintainer
.writeDeletionVectorsIndex()
.map(fileMeta -> new IndexManifestEntry(FileKind.ADD, partition, bucket, fileMeta))
.ifPresent(result::add);
Pair<IndexFileMeta, IndexFileMeta> pair = maintainer.writeDeletionVectorsIndex();
if (pair.getLeft() != null) {
result.add(new IndexManifestEntry(FileKind.DELETE, partition, bucket, pair.getLeft()));
}
if (pair.getRight() != null) {
result.add(new IndexManifestEntry(FileKind.ADD, partition, bucket, pair.getRight()));
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.utils.IntHashSet;
import org.apache.paimon.utils.IntIterator;
import org.apache.paimon.utils.Pair;

import javax.annotation.Nullable;

import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.List;

/** An Index Maintainer for dynamic bucket to maintain key hashcode in a bucket. */
public class DynamicBucketIndexMaintainer {

@Nullable private IndexFileMeta beforeFile;
private final HashIndexFile indexFile;
private final IntHashSet hashcode;

Expand All @@ -45,6 +45,7 @@ private DynamicBucketIndexMaintainer(
HashIndexFile indexFile, @Nullable IndexFileMeta restoredFile) {
this.indexFile = indexFile;
IntHashSet hashcode = new IntHashSet();
this.beforeFile = restoredFile;
if (restoredFile != null) {
hashcode = new IntHashSet((int) restoredFile.rowCount());
restore(indexFile, hashcode, restoredFile);
Expand Down Expand Up @@ -78,7 +79,8 @@ public void notifyNewRecord(KeyValue record) {
}
}

public List<IndexFileMeta> prepareCommit() {
/** Write before and new index file pair if any modifications have been made. */
public Pair<IndexFileMeta, IndexFileMeta> prepareCommit() {
if (modified) {
IndexFileMeta entry;
try {
Expand All @@ -87,9 +89,11 @@ public List<IndexFileMeta> prepareCommit() {
throw new RuntimeException(e);
}
modified = false;
return Collections.singletonList(entry);
IndexFileMeta toRemove = beforeFile;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the streaming scenario, the index maintainer will always hold it and not create it repeatedly, so this logic needs to be added

beforeFile = entry;
return Pair.of(toRemove, entry);
}
return Collections.emptyList();
return Pair.of(null, null);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.ObjectsFile;
Expand All @@ -34,7 +33,11 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Index manifest file. */
public class IndexManifestFile extends ObjectsFile<IndexManifestEntry> {
Expand All @@ -61,14 +64,32 @@ private IndexManifestFile(
/** Write new index files to index manifest. */
@Nullable
public String writeIndexFiles(
@Nullable String previousIndexManifest,
List<IndexManifestEntry> newIndexFiles,
BucketMode bucketMode) {
@Nullable String previousIndexManifest, List<IndexManifestEntry> newIndexFiles) {
if (newIndexFiles.isEmpty()) {
return previousIndexManifest;
}
IndexManifestFileHandler handler = new IndexManifestFileHandler(this, bucketMode);
return handler.write(previousIndexManifest, newIndexFiles);

List<IndexManifestEntry> baseEntries =
previousIndexManifest == null ? new ArrayList<>() : read(previousIndexManifest);
return writeWithoutRolling(combine(baseEntries, newIndexFiles));
}

private List<IndexManifestEntry> combine(
List<IndexManifestEntry> prevIndexFiles, List<IndexManifestEntry> newIndexFiles) {
Map<String, IndexManifestEntry> indexEntries = new HashMap<>();
for (IndexManifestEntry entry : prevIndexFiles) {
checkArgument(entry.kind() == FileKind.ADD);
indexEntries.put(entry.indexFile().fileName(), entry);
}

for (IndexManifestEntry entry : newIndexFiles) {
if (entry.kind() == FileKind.ADD) {
indexEntries.put(entry.indexFile().fileName(), entry);
} else {
indexEntries.remove(entry.indexFile().fileName());
}
}
return new ArrayList<>(indexEntries.values());
}

/** Creator of {@link IndexManifestFile}. */
Expand Down
Loading
Loading