diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 2d71f61580db..0edea96d7207 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -290,7 +290,6 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) { newKeyComparator(), options.branch(), newStatsFileHandler(), - bucketMode(), options.scanManifestParallelism(), createCommitCallbacks(commitUser, table), options.commitMaxRetries(), diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java index 8e9634e86e26..190f030486c7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java @@ -21,15 +21,19 @@ import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile; import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.utils.Pair; import javax.annotation.Nullable; -import java.util.Optional; - /** Deletion File from compaction. */ public interface CompactDeletionFile { - Optional getOrCompute(); + /** + * Get or compute the deletion file. + * + * @return Pair of deleted deletion file and new deletion file. + */ + Pair getOrCompute(); CompactDeletionFile mergeOldFile(CompactDeletionFile old); @@ -41,8 +45,8 @@ public interface CompactDeletionFile { * them). */ static CompactDeletionFile generateFiles(BucketedDvMaintainer maintainer) { - Optional file = maintainer.writeDeletionVectorsIndex(); - return new GeneratedDeletionFile(file.orElse(null), maintainer.dvIndexFile()); + Pair pair = maintainer.writeDeletionVectorsIndex(); + return new GeneratedDeletionFile(pair.getLeft(), pair.getRight(), maintainer.dvIndexFile()); } /** For sync compaction, only create deletion files when prepareCommit. */ @@ -53,21 +57,25 @@ static CompactDeletionFile lazyGeneration(BucketedDvMaintainer maintainer) { /** A generated files implementation of {@link CompactDeletionFile}. */ class GeneratedDeletionFile implements CompactDeletionFile { - @Nullable private final IndexFileMeta deletionFile; + @Nullable private IndexFileMeta deleteDeletionFile; + @Nullable private final IndexFileMeta newDeletionFile; private final DeletionVectorsIndexFile dvIndexFile; private boolean getInvoked = false; public GeneratedDeletionFile( - @Nullable IndexFileMeta deletionFile, DeletionVectorsIndexFile dvIndexFile) { - this.deletionFile = deletionFile; + @Nullable IndexFileMeta deletedDeletionFile, + @Nullable IndexFileMeta newDeletionFile, + DeletionVectorsIndexFile dvIndexFile) { + this.deleteDeletionFile = deletedDeletionFile; + this.newDeletionFile = newDeletionFile; this.dvIndexFile = dvIndexFile; } @Override - public Optional getOrCompute() { + public Pair getOrCompute() { this.getInvoked = true; - return Optional.ofNullable(deletionFile); + return Pair.of(deleteDeletionFile, newDeletionFile); } @Override @@ -81,18 +89,20 @@ public CompactDeletionFile mergeOldFile(CompactDeletionFile old) { throw new IllegalStateException("old should not be get, this is a bug."); } - if (deletionFile == null) { + if (newDeletionFile == null) { return old; } old.clean(); + // Keep the old deletion file. + deleteDeletionFile = ((GeneratedDeletionFile) old).deleteDeletionFile; return this; } @Override public void clean() { - if (deletionFile != null) { - dvIndexFile.delete(deletionFile); + if (newDeletionFile != null) { + dvIndexFile.delete(newDeletionFile); } } } @@ -109,7 +119,7 @@ public LazyCompactDeletionFile(BucketedDvMaintainer maintainer) { } @Override - public Optional getOrCompute() { + public Pair getOrCompute() { generated = true; return generateFiles(maintainer).getOrCompute(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java index 788ea1cce85d..846e72f9e192 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.utils.Pair; import javax.annotation.Nullable; @@ -38,13 +39,17 @@ public class BucketedDvMaintainer { private final Map deletionVectors; protected final boolean bitmap64; private boolean modified; + @Nullable private IndexFileMeta beforeDvFile; private BucketedDvMaintainer( - DeletionVectorsIndexFile dvIndexFile, Map deletionVectors) { + DeletionVectorsIndexFile dvIndexFile, + Map deletionVectors, + @Nullable IndexFileMeta restoredDvFile) { this.dvIndexFile = dvIndexFile; this.deletionVectors = deletionVectors; this.bitmap64 = dvIndexFile.bitmap64(); this.modified = false; + this.beforeDvFile = restoredDvFile; } private DeletionVector createNewDeletionVector() { @@ -107,17 +112,18 @@ public void removeDeletionVectorOf(String fileName) { } /** - * Write new deletion vectors index file if any modifications have been made. - * - * @return None if no modifications have been made, otherwise the new deletion vectors index - * file. + * Write before and new deletion vectors index file pair if any modifications have been made. */ - public Optional writeDeletionVectorsIndex() { + public Pair writeDeletionVectorsIndex() { if (modified) { modified = false; - return Optional.of(dvIndexFile.writeSingleFile(deletionVectors)); + IndexFileMeta newIndexFile = dvIndexFile.writeSingleFile(deletionVectors); + IndexFileMeta toRemove = beforeDvFile; + beforeDvFile = newIndexFile; + return Pair.of(toRemove, newIndexFile); + } else { + return Pair.of(null, null); } - return Optional.empty(); } /** @@ -168,12 +174,13 @@ public BucketedDvMaintainer create( } Map deletionVectors = new HashMap<>(handler.readAllDeletionVectors(partition, bucket, restoredFiles)); - return create(partition, bucket, deletionVectors); - } - - public BucketedDvMaintainer create( - BinaryRow partition, int bucket, Map deletionVectors) { - return new BucketedDvMaintainer(handler.dvIndex(partition, bucket), deletionVectors); + if (restoredFiles.size() > 1) { + throw new UnsupportedOperationException( + "For bucket table, one bucket should only have one dv file at most."); + } + IndexFileMeta fileMeta = restoredFiles.isEmpty() ? null : restoredFiles.get(0); + return new BucketedDvMaintainer( + handler.dvIndex(partition, bucket), deletionVectors, fileMeta); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeleteFileMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeleteFileMaintainer.java index 7245ebfe121e..d148b5df6e2f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeleteFileMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeleteFileMaintainer.java @@ -21,8 +21,10 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.deletionvectors.DeletionVector; +import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.utils.Pair; import java.util.ArrayList; import java.util.List; @@ -59,10 +61,13 @@ public void notifyNewDeletionVector(String dataFile, DeletionVector deletionVect @Override public List persist() { List result = new ArrayList<>(); - maintainer - .writeDeletionVectorsIndex() - .map(fileMeta -> new IndexManifestEntry(FileKind.ADD, partition, bucket, fileMeta)) - .ifPresent(result::add); + Pair pair = maintainer.writeDeletionVectorsIndex(); + if (pair.getLeft() != null) { + result.add(new IndexManifestEntry(FileKind.DELETE, partition, bucket, pair.getLeft())); + } + if (pair.getRight() != null) { + result.add(new IndexManifestEntry(FileKind.ADD, partition, bucket, pair.getRight())); + } return result; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java index e7b3630f1e8b..747517d639c4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java @@ -24,18 +24,18 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.utils.IntHashSet; import org.apache.paimon.utils.IntIterator; +import org.apache.paimon.utils.Pair; import javax.annotation.Nullable; import java.io.EOFException; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.Collections; -import java.util.List; /** An Index Maintainer for dynamic bucket to maintain key hashcode in a bucket. */ public class DynamicBucketIndexMaintainer { + @Nullable private IndexFileMeta beforeFile; private final HashIndexFile indexFile; private final IntHashSet hashcode; @@ -45,6 +45,7 @@ private DynamicBucketIndexMaintainer( HashIndexFile indexFile, @Nullable IndexFileMeta restoredFile) { this.indexFile = indexFile; IntHashSet hashcode = new IntHashSet(); + this.beforeFile = restoredFile; if (restoredFile != null) { hashcode = new IntHashSet((int) restoredFile.rowCount()); restore(indexFile, hashcode, restoredFile); @@ -78,7 +79,8 @@ public void notifyNewRecord(KeyValue record) { } } - public List prepareCommit() { + /** Write before and new index file pair if any modifications have been made. */ + public Pair prepareCommit() { if (modified) { IndexFileMeta entry; try { @@ -87,9 +89,11 @@ public List prepareCommit() { throw new RuntimeException(e); } modified = false; - return Collections.singletonList(entry); + IndexFileMeta toRemove = beforeFile; + beforeFile = entry; + return Pair.of(toRemove, entry); } - return Collections.emptyList(); + return Pair.of(null, null); } @VisibleForTesting diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java index 077d1a45aa29..761a6d1320df 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java @@ -23,7 +23,6 @@ import org.apache.paimon.format.FormatWriterFactory; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; -import org.apache.paimon.table.BucketMode; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.ObjectsFile; @@ -34,7 +33,11 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; + +import static org.apache.paimon.utils.Preconditions.checkArgument; /** Index manifest file. */ public class IndexManifestFile extends ObjectsFile { @@ -61,14 +64,32 @@ private IndexManifestFile( /** Write new index files to index manifest. */ @Nullable public String writeIndexFiles( - @Nullable String previousIndexManifest, - List newIndexFiles, - BucketMode bucketMode) { + @Nullable String previousIndexManifest, List newIndexFiles) { if (newIndexFiles.isEmpty()) { return previousIndexManifest; } - IndexManifestFileHandler handler = new IndexManifestFileHandler(this, bucketMode); - return handler.write(previousIndexManifest, newIndexFiles); + + List baseEntries = + previousIndexManifest == null ? new ArrayList<>() : read(previousIndexManifest); + return writeWithoutRolling(combine(baseEntries, newIndexFiles)); + } + + private List combine( + List prevIndexFiles, List newIndexFiles) { + Map indexEntries = new HashMap<>(); + for (IndexManifestEntry entry : prevIndexFiles) { + checkArgument(entry.kind() == FileKind.ADD); + indexEntries.put(entry.indexFile().fileName(), entry); + } + + for (IndexManifestEntry entry : newIndexFiles) { + if (entry.kind() == FileKind.ADD) { + indexEntries.put(entry.indexFile().fileName(), entry); + } else { + indexEntries.remove(entry.indexFile().fileName()); + } + } + return new ArrayList<>(indexEntries.values()); } /** Creator of {@link IndexManifestFile}. */ diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java deleted file mode 100644 index bf334e4ad301..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.manifest; - -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.index.IndexFileMeta; -import org.apache.paimon.table.BucketMode; -import org.apache.paimon.utils.Pair; - -import javax.annotation.Nullable; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; - -import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; -import static org.apache.paimon.index.HashIndexFile.HASH_INDEX; -import static org.apache.paimon.utils.Preconditions.checkArgument; - -/** IndexManifestFile Handler. */ -public class IndexManifestFileHandler { - - private final IndexManifestFile indexManifestFile; - - private final BucketMode bucketMode; - - IndexManifestFileHandler(IndexManifestFile indexManifestFile, BucketMode bucketMode) { - this.indexManifestFile = indexManifestFile; - this.bucketMode = bucketMode; - } - - String write(@Nullable String previousIndexManifest, List newIndexFiles) { - List entries = - previousIndexManifest == null - ? new ArrayList<>() - : indexManifestFile.read(previousIndexManifest); - for (IndexManifestEntry entry : entries) { - checkArgument(entry.kind() == FileKind.ADD); - } - - Pair, List> previous = - separateIndexEntries(entries); - Pair, List> current = - separateIndexEntries(newIndexFiles); - - // Step1: get the hash index files; - List indexEntries = - getIndexManifestFileCombine(HASH_INDEX) - .combine(previous.getLeft(), current.getLeft()); - - // Step2: get the dv index files; - indexEntries.addAll( - getIndexManifestFileCombine(DELETION_VECTORS_INDEX) - .combine(previous.getRight(), current.getRight())); - - return indexManifestFile.writeWithoutRolling(indexEntries); - } - - private Pair, List> separateIndexEntries( - List indexFiles) { - List hashEntries = new ArrayList<>(); - List dvEntries = new ArrayList<>(); - for (IndexManifestEntry entry : indexFiles) { - String indexType = entry.indexFile().indexType(); - if (indexType.equals(DELETION_VECTORS_INDEX)) { - dvEntries.add(entry); - } else if (indexType.equals(HASH_INDEX)) { - hashEntries.add(entry); - } else { - throw new IllegalArgumentException("Can't recognize this index type: " + indexType); - } - } - return Pair.of(hashEntries, dvEntries); - } - - private IndexManifestFileCombiner getIndexManifestFileCombine(String indexType) { - if (DELETION_VECTORS_INDEX.equals(indexType) && BucketMode.BUCKET_UNAWARE == bucketMode) { - return new GlobalCombiner(); - } else { - return new BucketedCombiner(); - } - } - - interface IndexManifestFileCombiner { - List combine( - List prevIndexFiles, List newIndexFiles); - } - - /** - * We combine the previous and new index files by the file name. This is only used for tables - * without bucket. - */ - static class GlobalCombiner implements IndexManifestFileCombiner { - - @Override - public List combine( - List prevIndexFiles, List newIndexFiles) { - Map indexEntries = new HashMap<>(); - for (IndexManifestEntry entry : prevIndexFiles) { - indexEntries.put(entry.indexFile().fileName(), entry); - } - - for (IndexManifestEntry entry : newIndexFiles) { - if (entry.kind() == FileKind.ADD) { - indexEntries.put(entry.indexFile().fileName(), entry); - } else { - indexEntries.remove(entry.indexFile().fileName()); - } - } - return new ArrayList<>(indexEntries.values()); - } - } - - /** We combine the previous and new index files by {@link BucketIdentifier}. */ - static class BucketedCombiner implements IndexManifestFileCombiner { - - @Override - public List combine( - List prevIndexFiles, List newIndexFiles) { - Map indexEntries = new HashMap<>(); - for (IndexManifestEntry entry : prevIndexFiles) { - indexEntries.put(identifier(entry), entry); - } - - // The deleted entry is processed first to avoid overwriting a new entry. - List removed = - newIndexFiles.stream() - .filter(f -> f.kind() == FileKind.DELETE) - .collect(Collectors.toList()); - List added = - newIndexFiles.stream() - .filter(f -> f.kind() == FileKind.ADD) - .collect(Collectors.toList()); - for (IndexManifestEntry entry : removed) { - indexEntries.remove(identifier(entry)); - } - for (IndexManifestEntry entry : added) { - indexEntries.put(identifier(entry), entry); - } - return new ArrayList<>(indexEntries.values()); - } - } - - private static BucketIdentifier identifier(IndexManifestEntry indexManifestEntry) { - return new BucketIdentifier( - indexManifestEntry.partition(), - indexManifestEntry.bucket(), - indexManifestEntry.indexFile().indexType()); - } - - /** The {@link BucketIdentifier} of a {@link IndexFileMeta}. */ - private static class BucketIdentifier { - - public final BinaryRow partition; - public final int bucket; - public final String indexType; - - private Integer hash; - - private BucketIdentifier(BinaryRow partition, int bucket, String indexType) { - this.partition = partition; - this.bucket = bucket; - this.indexType = indexType; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - BucketIdentifier that = (BucketIdentifier) o; - return bucket == that.bucket - && Objects.equals(partition, that.partition) - && Objects.equals(indexType, that.indexType); - } - - @Override - public int hashCode() { - if (hash == null) { - hash = Objects.hash(partition, bucket, indexType); - } - return hash; - } - - @Override - public String toString() { - return "BucketIdentifier{" - + "partition=" - + partition - + ", bucket=" - + bucket - + ", indexType='" - + indexType - + '\'' - + '}'; - } - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index ddf2addf5313..5b14e664cd1a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -28,6 +28,7 @@ import org.apache.paimon.disk.IOManager; import org.apache.paimon.index.DynamicBucketIndexMaintainer; import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; @@ -39,6 +40,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.ExecutorThreadFactory; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.RecordWriter; import org.apache.paimon.utils.SnapshotManager; @@ -216,15 +218,24 @@ public List prepareCommit(boolean waitCompaction, long commitIden DataIncrement newFilesIncrement = increment.newFilesIncrement(); CompactIncrement compactIncrement = increment.compactIncrement(); if (writerContainer.dynamicBucketMaintainer != null) { - newFilesIncrement - .newIndexFiles() - .addAll(writerContainer.dynamicBucketMaintainer.prepareCommit()); + Pair pair = + writerContainer.dynamicBucketMaintainer.prepareCommit(); + if (pair.getLeft() != null) { + newFilesIncrement.deletedIndexFiles().add(pair.getLeft()); + } + if (pair.getRight() != null) { + newFilesIncrement.newIndexFiles().add(pair.getRight()); + } } CompactDeletionFile compactDeletionFile = increment.compactDeletionFile(); if (compactDeletionFile != null) { - compactDeletionFile - .getOrCompute() - .ifPresent(compactIncrement.newIndexFiles()::add); + Pair pair = compactDeletionFile.getOrCompute(); + if (pair.getLeft() != null) { + compactIncrement.deletedIndexFiles().add(pair.getLeft()); + } + if (pair.getRight() != null) { + compactIncrement.newIndexFiles().add(pair.getRight()); + } } CommitMessageImpl committable = new CommitMessageImpl( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 466df201a363..9499e6e8d2bd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -143,7 +143,6 @@ public class FileStoreCommitImpl implements FileStoreCommit { @Nullable private final Integer manifestReadParallelism; private final List commitCallbacks; private final StatsFileHandler statsFileHandler; - private final BucketMode bucketMode; private final long commitTimeout; private final long commitMinRetryWait; private final long commitMaxRetryWait; @@ -179,7 +178,6 @@ public FileStoreCommitImpl( @Nullable Comparator keyComparator, String branchName, StatsFileHandler statsFileHandler, - BucketMode bucketMode, @Nullable Integer manifestReadParallelism, List commitCallbacks, int commitMaxRetries, @@ -229,7 +227,6 @@ public FileStoreCommitImpl( this.ignoreEmptyCommit = true; this.commitMetrics = null; this.statsFileHandler = statsFileHandler; - this.bucketMode = bucketMode; this.rowTrackingEnabled = rowTrackingEnabled; } @@ -1096,8 +1093,7 @@ CommitResult tryCommitOnce( changelogManifestList = manifestList.write(manifestFile.write(changelogFiles)); } - indexManifest = - indexManifestFile.writeIndexFiles(oldIndexManifest, indexFiles, bucketMode); + indexManifest = indexManifestFile.writeIndexFiles(oldIndexManifest, indexFiles); long latestSchemaId = schemaManager diff --git a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java index 3ca33a8222d0..47429c9ed506 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java @@ -19,9 +19,12 @@ package org.apache.paimon; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.deletionvectors.BucketedDvMaintainer; +import org.apache.paimon.deletionvectors.Bitmap64DeletionVector; +import org.apache.paimon.deletionvectors.BitmapDeletionVector; +import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.deletionvectors.append.AppendDeleteFileMaintainer; import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainerHelper; +import org.apache.paimon.deletionvectors.append.BaseAppendDeleteFileMaintainer; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileIOFinder; import org.apache.paimon.fs.Path; @@ -30,6 +33,8 @@ import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.operation.FileStoreCommitImpl; import org.apache.paimon.schema.Schema; @@ -108,21 +113,6 @@ public void commit(CommitMessage... commitMessages) { newCommit().commit(committable, false); } - public CommitMessage removeIndexFiles( - BinaryRow partition, int bucket, List indexFileMetas) { - return new CommitMessageImpl( - partition, - bucket, - options().bucket(), - new DataIncrement( - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - indexFileMetas), - CompactIncrement.emptyIncrement()); - } - public List scanDVIndexFiles(BinaryRow partition, int bucket) { Snapshot latestSnapshot = snapshotManager().latestSnapshot(); return fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, partition, bucket); @@ -134,34 +124,44 @@ public AppendDeleteFileMaintainer createDVIFMaintainer( fileHandler, partition, dataFileToDeletionFiles); } - public BucketedDvMaintainer createOrRestoreDVMaintainer(BinaryRow partition, int bucket) { - Snapshot latestSnapshot = snapshotManager().latestSnapshot(); - BucketedDvMaintainer.Factory factory = BucketedDvMaintainer.factory(fileHandler); - List indexFiles = - fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, partition, bucket); - return factory.create(partition, bucket, indexFiles); + public Map deletionVectors(BinaryRow partition, int bucket) { + return fileHandler.readAllDeletionVectors( + partition, bucket, scanDVIndexFiles(partition, bucket)); } public CommitMessageImpl writeDVIndexFiles( - BinaryRow partition, int bucket, Map> dataFileToPositions) { - BucketedDvMaintainer dvMaintainer = createOrRestoreDVMaintainer(partition, bucket); + BinaryRow partition, Map> dataFileToPositions, boolean bitmap64) { + AppendDeleteFileMaintainer dvMaintainer = + BaseAppendDeleteFileMaintainer.forUnawareAppend( + fileHandler, snapshotManager().latestSnapshot(), partition); for (Map.Entry> entry : dataFileToPositions.entrySet()) { - for (Integer pos : entry.getValue()) { - dvMaintainer.notifyNewDeletion(entry.getKey(), pos); + DeletionVector dv = + bitmap64 ? new Bitmap64DeletionVector() : new BitmapDeletionVector(); + for (Integer i : entry.getValue()) { + dv.delete(i); + } + dvMaintainer.notifyNewDeletionVector(entry.getKey(), dv); + } + List persist = dvMaintainer.persist(); + List newIndexFiles = new ArrayList<>(); + List deletedIndexFiles = new ArrayList<>(); + for (IndexManifestEntry indexManifestEntry : persist) { + if (indexManifestEntry.kind() == FileKind.ADD) { + newIndexFiles.add(indexManifestEntry.indexFile()); + } else { + deletedIndexFiles.add(indexManifestEntry.indexFile()); } } - List indexFiles = new ArrayList<>(); - dvMaintainer.writeDeletionVectorsIndex().ifPresent(indexFiles::add); return new CommitMessageImpl( partition, - bucket, + 0, options().bucket(), new DataIncrement( Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), - indexFiles, - Collections.emptyList()), + newIndexFiles, + deletedIndexFiles), CompactIncrement.emptyIncrement()); } diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index aa405060f422..2ee651cfd90a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -224,6 +224,7 @@ public List commitDataWatermark( null, watermark, Collections.emptyList(), + Collections.emptyList(), (commit, committable) -> commit.commit(committable, false)); } @@ -241,6 +242,7 @@ public List commitData( null, null, Collections.emptyList(), + Collections.emptyList(), (commit, committable) -> { logOffsets.forEach( (bucket, offset) -> committable.addLogOffset(bucket, offset, false)); @@ -262,6 +264,7 @@ public List overwriteData( null, null, Collections.emptyList(), + Collections.emptyList(), (commit, committable) -> commit.overwritePartition(partition, committable, Collections.emptyMap())); } @@ -288,6 +291,7 @@ public List commitDataIndex( KeyValue kv, Function partitionCalculator, int bucket, + List deletedIndexFiles, IndexFileMeta... indexFiles) throws Exception { return commitDataImpl( @@ -297,6 +301,7 @@ public List commitDataIndex( false, null, null, + deletedIndexFiles, Arrays.asList(indexFiles), (commit, committable) -> commit.commit(committable, false)); } @@ -308,7 +313,8 @@ public List commitDataImpl( boolean ignorePreviousFiles, Long identifier, Long watermark, - List indexFiles, + List deletedIndexFiles, + List newIndexFiles, BiConsumer commitFunction) throws Exception { AbstractFileStoreWrite write = newWrite(); @@ -347,7 +353,8 @@ public List commitDataImpl( CommitIncrement increment = entryWithBucket.getValue().prepareCommit(ignorePreviousFiles); DataIncrement dataIncrement = increment.newFilesIncrement(); - dataIncrement.newIndexFiles().addAll(indexFiles); + dataIncrement.deletedIndexFiles().addAll(deletedIndexFiles); + dataIncrement.newIndexFiles().addAll(newIndexFiles); committable.addFileCommittable( new CommitMessageImpl( entryWithPartition.getKey(), diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java index 0e95135873b0..93c389f57318 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java @@ -85,7 +85,7 @@ public void testAppendCompactionWithDeletionVectors(boolean compactBeforeAllFile dvs.put("data-1.orc", Arrays.asList(2, 4, 6)); // Write deletion vectors for all files to simulate existing deletion vectors - CommitMessageImpl commitMessage = store.writeDVIndexFiles(BinaryRow.EMPTY_ROW, 0, dvs); + CommitMessageImpl commitMessage = store.writeDVIndexFiles(BinaryRow.EMPTY_ROW, dvs, true); store.commit(commitMessage); List allFiles = diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java index 10c6f4520007..d77313af24ca 100644 --- a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java @@ -22,29 +22,25 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.PrimaryKeyTableTestBase; import org.apache.paimon.compact.CompactDeletionFile; -import org.apache.paimon.data.BinaryRow; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataIncrement; -import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.utils.FileIOUtils; +import org.apache.paimon.utils.Pair; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import javax.annotation.Nullable; - import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; @@ -71,7 +67,7 @@ public void test0(boolean bitmap64) { assertThat(dvMaintainer.deletionVectorOf("f1")).isPresent(); assertThat(dvMaintainer.deletionVectorOf("f3")).isEmpty(); - IndexFileMeta file = dvMaintainer.writeDeletionVectorsIndex().get(); + IndexFileMeta file = dvMaintainer.writeDeletionVectorsIndex().getRight(); Map deletionVectors = fileHandler.readAllDeletionVectors(EMPTY_ROW, 0, Collections.singletonList(file)); @@ -89,7 +85,7 @@ public void test1(boolean bitmap64) { BucketedDvMaintainer.Factory factory = BucketedDvMaintainer.factory(fileHandler); - BucketedDvMaintainer dvMaintainer = factory.create(EMPTY_ROW, 0, new HashMap<>()); + BucketedDvMaintainer dvMaintainer = factory.create(EMPTY_ROW, 0, Collections.emptyList()); DeletionVector deletionVector1 = createDeletionVector(bitmap64); deletionVector1.delete(1); deletionVector1.delete(3); @@ -97,19 +93,7 @@ public void test1(boolean bitmap64) { dvMaintainer.notifyNewDeletion("f1", deletionVector1); assertThat(dvMaintainer.bitmap64()).isEqualTo(bitmap64); - IndexFileMeta file = dvMaintainer.writeDeletionVectorsIndex().get(); - CommitMessage commitMessage = - new CommitMessageImpl( - EMPTY_ROW, - 0, - 1, - new DataIncrement( - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - Collections.singletonList(file), - Collections.emptyList()), - CompactIncrement.emptyIncrement()); + CommitMessage commitMessage = buildCommitMessage(dvMaintainer.writeDeletionVectorsIndex()); BatchTableCommit commit = table.newBatchWriteBuilder().newCommit(); commit.commit(Collections.singletonList(commitMessage)); @@ -124,19 +108,7 @@ public void test1(boolean bitmap64) { deletionVector2.delete(2); dvMaintainer.notifyNewDeletion("f1", deletionVector2); - file = dvMaintainer.writeDeletionVectorsIndex().get(); - commitMessage = - new CommitMessageImpl( - EMPTY_ROW, - 0, - 1, - new DataIncrement( - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - Collections.singletonList(file), - Collections.emptyList()), - CompactIncrement.emptyIncrement()); + commitMessage = buildCommitMessage(dvMaintainer.writeDeletionVectorsIndex()); commit = table.newBatchWriteBuilder().newCommit(); commit.commit(Collections.singletonList(commitMessage)); @@ -172,6 +144,9 @@ public void testCompactDeletion(boolean bitmap64) throws IOException { assertThat(indexDir.listFiles()).hasSize(1); FileIOUtils.deleteDirectory(indexDir); + Pair pair2 = deletionFile2.getOrCompute(); + assertThat(pair2.getLeft()).isEqualTo(null); + // test lazyGeneration dvMaintainer.notifyNewDeletion("f1", 3); @@ -187,6 +162,9 @@ public void testCompactDeletion(boolean bitmap64) throws IOException { deletionFile4.getOrCompute(); assertThat(indexDir.listFiles()).hasSize(1); + + Pair pair4 = deletionFile4.getOrCompute(); + assertThat(pair4.getLeft()).isEqualTo(null); } @ParameterizedTest @@ -195,26 +173,15 @@ public void testReadAndWriteMixedDv(boolean bitmap64) { // write first kind dv initIndexHandler(bitmap64); BucketedDvMaintainer.Factory factory1 = BucketedDvMaintainer.factory(fileHandler); - BucketedDvMaintainer dvMaintainer1 = factory1.create(EMPTY_ROW, 0, new HashMap<>()); + BucketedDvMaintainer dvMaintainer1 = factory1.create(EMPTY_ROW, 0, Collections.emptyList()); dvMaintainer1.notifyNewDeletion("f1", 1); dvMaintainer1.notifyNewDeletion("f1", 3); dvMaintainer1.notifyNewDeletion("f2", 1); dvMaintainer1.notifyNewDeletion("f2", 3); assertThat(dvMaintainer1.bitmap64()).isEqualTo(bitmap64); - IndexFileMeta file = dvMaintainer1.writeDeletionVectorsIndex().get(); CommitMessage commitMessage1 = - new CommitMessageImpl( - EMPTY_ROW, - 0, - 1, - DataIncrement.emptyIncrement(), - new CompactIncrement( - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - Collections.singletonList(file), - Collections.emptyList())); + buildCommitMessage(dvMaintainer1.writeDeletionVectorsIndex()); BatchTableCommit commit1 = table.newBatchWriteBuilder().newCommit(); commit1.commit(Collections.singletonList(commitMessage1)); @@ -239,19 +206,8 @@ public void testReadAndWriteMixedDv(boolean bitmap64) { assertThat(dvs.get("f3")) .isInstanceOf(bitmap64 ? BitmapDeletionVector.class : Bitmap64DeletionVector.class); - file = dvMaintainer2.writeDeletionVectorsIndex().get(); CommitMessage commitMessage2 = - new CommitMessageImpl( - EMPTY_ROW, - 0, - 1, - DataIncrement.emptyIncrement(), - new CompactIncrement( - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - Collections.singletonList(file), - Collections.emptyList())); + buildCommitMessage(dvMaintainer2.writeDeletionVectorsIndex()); BatchTableCommit commit2 = table.newBatchWriteBuilder().newCommit(); commit2.commit(Collections.singletonList(commitMessage2)); @@ -281,19 +237,21 @@ private void initIndexHandler(boolean bitmap64) { fileHandler = table.store().newIndexFileHandler(); } - public static BucketedDvMaintainer createOrRestore( - BucketedDvMaintainer.Factory factory, - @Nullable Snapshot snapshot, - BinaryRow partition) { - IndexFileHandler handler = factory.indexFileHandler(); - List indexFiles = - snapshot == null - ? Collections.emptyList() - : handler.scanEntries(snapshot, DELETION_VECTORS_INDEX, partition).stream() - .map(IndexManifestEntry::indexFile) - .collect(Collectors.toList()); - Map deletionVectors = - new HashMap<>(handler.readAllDeletionVectors(EMPTY_ROW, 0, indexFiles)); - return factory.create(EMPTY_ROW, 0, deletionVectors); + private CommitMessageImpl buildCommitMessage(Pair pair) { + return new CommitMessageImpl( + EMPTY_ROW, + 0, + 1, + DataIncrement.emptyIncrement(), + new CompactIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + pair.getRight() == null + ? Collections.emptyList() + : Collections.singletonList(pair.getRight()), + pair.getLeft() == null + ? Collections.emptyList() + : Collections.singletonList(pair.getLeft()))); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java index 3a03985c885d..35977d5fc94e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java @@ -58,12 +58,13 @@ public void test(boolean bitmap64) throws Exception { Map> dvs = new HashMap<>(); dvs.put("f1", Arrays.asList(1, 3, 5)); dvs.put("f2", Arrays.asList(2, 4, 6)); - CommitMessageImpl commitMessage1 = store.writeDVIndexFiles(BinaryRow.EMPTY_ROW, 0, dvs); + CommitMessageImpl commitMessage1 = + store.writeDVIndexFiles(BinaryRow.EMPTY_ROW, dvs, bitmap64); CommitMessageImpl commitMessage2 = store.writeDVIndexFiles( BinaryRow.EMPTY_ROW, - 1, - Collections.singletonMap("f3", Arrays.asList(1, 2, 3))); + Collections.singletonMap("f3", Arrays.asList(1, 2, 3)), + bitmap64); store.commit(commitMessage1, commitMessage2); IndexPathFactory indexPathFactory = @@ -101,22 +102,6 @@ public void test(boolean bitmap64) throws Exception { IndexManifestEntry entry = res.stream().filter(file -> file.kind() == FileKind.ADD).findAny().get(); assertThat(entry.indexFile().dvRanges().containsKey("f2")).isTrue(); - entry = - res.stream() - .filter(file -> file.kind() == FileKind.DELETE) - .filter(file -> file.bucket() == 0) - .findAny() - .get(); - assertThat(entry.indexFile()) - .isEqualTo(commitMessage1.newFilesIncrement().newIndexFiles().get(0)); - entry = - res.stream() - .filter(file -> file.kind() == FileKind.DELETE) - .filter(file -> file.bucket() == 1) - .findAny() - .get(); - assertThat(entry.indexFile()) - .isEqualTo(commitMessage2.newFilesIncrement().newIndexFiles().get(0)); } private Map createDeletionFileMapFromIndexFileMetas( diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileTest.java similarity index 84% rename from paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java rename to paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileTest.java index c2c5baba0a07..4148c17138aa 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileTest.java @@ -21,7 +21,6 @@ import org.apache.paimon.TestAppendFileStore; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.format.FileFormat; -import org.apache.paimon.table.BucketMode; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -33,8 +32,8 @@ import static org.apache.paimon.index.IndexFileMetaSerializerTest.randomDeletionVectorIndexFile; import static org.assertj.core.api.Assertions.assertThat; -/** Test for IndexManifestFileHandler. */ -public class IndexManifestFileHandlerTest { +/** Test for IndexManifestFile. */ +public class IndexManifestFileTest { @TempDir java.nio.file.Path tempDir; @@ -51,20 +50,19 @@ public void testUnawareMode() throws Exception { fileStore.pathFactory(), null) .create(); - IndexManifestFileHandler indexManifestFileHandler = - new IndexManifestFileHandler(indexManifestFile, BucketMode.BUCKET_UNAWARE); IndexManifestEntry entry1 = new IndexManifestEntry( FileKind.ADD, BinaryRow.EMPTY_ROW, 0, randomDeletionVectorIndexFile()); - String indexManifestFile1 = indexManifestFileHandler.write(null, Arrays.asList(entry1)); + String indexManifestFile1 = indexManifestFile.writeIndexFiles(null, Arrays.asList(entry1)); IndexManifestEntry entry2 = entry1.toDeleteEntry(); IndexManifestEntry entry3 = new IndexManifestEntry( FileKind.ADD, BinaryRow.EMPTY_ROW, 0, randomDeletionVectorIndexFile()); String indexManifestFile2 = - indexManifestFileHandler.write(indexManifestFile1, Arrays.asList(entry2, entry3)); + indexManifestFile.writeIndexFiles( + indexManifestFile1, Arrays.asList(entry2, entry3)); List entries = indexManifestFile.read(indexManifestFile2); assertThat(entries.size()).isEqualTo(1); @@ -86,8 +84,6 @@ public void testHashFixedBucket() throws Exception { fileStore.pathFactory(), null) .create(); - IndexManifestFileHandler indexManifestFileHandler = - new IndexManifestFileHandler(indexManifestFile, BucketMode.HASH_FIXED); IndexManifestEntry entry1 = new IndexManifestEntry( @@ -96,7 +92,7 @@ public void testHashFixedBucket() throws Exception { new IndexManifestEntry( FileKind.ADD, BinaryRow.EMPTY_ROW, 1, randomDeletionVectorIndexFile()); String indexManifestFile1 = - indexManifestFileHandler.write(null, Arrays.asList(entry1, entry2)); + indexManifestFile.writeIndexFiles(null, Arrays.asList(entry1, entry2)); IndexManifestEntry entry3 = new IndexManifestEntry( @@ -104,8 +100,10 @@ public void testHashFixedBucket() throws Exception { IndexManifestEntry entry4 = new IndexManifestEntry( FileKind.ADD, BinaryRow.EMPTY_ROW, 2, randomDeletionVectorIndexFile()); + IndexManifestEntry entry5 = entry2.toDeleteEntry(); String indexManifestFile2 = - indexManifestFileHandler.write(indexManifestFile1, Arrays.asList(entry3, entry4)); + indexManifestFile.writeIndexFiles( + indexManifestFile1, Arrays.asList(entry3, entry4, entry5)); List entries = indexManifestFile.read(indexManifestFile2); assertThat(entries.size()).isEqualTo(3); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 35843e0c2f7b..d64a30ecee1a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -25,7 +25,6 @@ import org.apache.paimon.TestFileStore; import org.apache.paimon.TestKeyValueGenerator; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; @@ -534,6 +533,7 @@ public void testCommitEmpty() throws Exception { null, null, Collections.emptyList(), + Collections.emptyList(), (commit, committable) -> commit.commit(committable, false)); assertThat(store.snapshotManager().latestSnapshotId()).isEqualTo(snapshot.id()); @@ -546,6 +546,7 @@ public void testCommitEmpty() throws Exception { null, null, Collections.emptyList(), + Collections.emptyList(), (commit, committable) -> { commit.ignoreEmptyCommit(false); commit.commit(committable, false); @@ -568,6 +569,7 @@ public void testCommitOldSnapshotAgain() throws Exception { (long) i, null, Collections.emptyList(), + Collections.emptyList(), (commit, committable) -> { commit.commit(committable, false); committables.add(committable); @@ -725,22 +727,21 @@ public void testIndexFiles() throws Exception { } // init write + IndexFileMeta writtenPart1Index = + indexFileHandler.hashIndex(gen.getPartition(record1), 0).write(new int[] {1, 2, 5}); store.commitDataIndex( - record1, - gen::getPartition, - 0, - indexFileHandler - .hashIndex(gen.getPartition(record1), 0) - .write(new int[] {1, 2, 5})); + record1, gen::getPartition, 0, Collections.emptyList(), writtenPart1Index); store.commitDataIndex( record1, gen::getPartition, 1, + Collections.emptyList(), indexFileHandler.hashIndex(gen.getPartition(record1), 1).write(new int[] {6, 8})); store.commitDataIndex( record2, gen::getPartition, 2, + Collections.emptyList(), indexFileHandler.hashIndex(gen.getPartition(record2), 2).write(new int[] {3, 5})); Snapshot snapshot = store.snapshotManager().latestSnapshot(); @@ -773,6 +774,7 @@ public void testIndexFiles() throws Exception { record1, gen::getPartition, 0, + Collections.singletonList(writtenPart1Index), indexFileHandler.hashIndex(gen.getPartition(record1), 0).write(new int[] {1, 4})); snapshot = store.snapshotManager().latestSnapshot(); @@ -891,19 +893,18 @@ public void testDVIndexFiles(boolean bitmap64) throws Exception { CommitMessageImpl commitMessage1 = store.writeDVIndexFiles( BinaryRow.EMPTY_ROW, - 0, - Collections.singletonMap("f1", Arrays.asList(1, 3))); + Collections.singletonMap("f1", Arrays.asList(1, 3)), + bitmap64); CommitMessageImpl commitMessage2 = store.writeDVIndexFiles( BinaryRow.EMPTY_ROW, - 0, - Collections.singletonMap("f2", Arrays.asList(2, 4))); + Collections.singletonMap("f2", Arrays.asList(2, 4)), + bitmap64); store.commit(commitMessage1, commitMessage2); // assert 1 assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW, 0).size()).isEqualTo(2); - BucketedDvMaintainer maintainer = store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0); - Map dvs = maintainer.deletionVectors(); + Map dvs = store.deletionVectors(BinaryRow.EMPTY_ROW, 0); assertThat(dvs.size()).isEqualTo(2); assertThat(dvs.get("f2").isDeleted(2)).isTrue(); assertThat(dvs.get("f2").isDeleted(3)).isFalse(); @@ -912,17 +913,14 @@ public void testDVIndexFiles(boolean bitmap64) throws Exception { // commit 2 CommitMessage commitMessage3 = store.writeDVIndexFiles( - BinaryRow.EMPTY_ROW, 0, Collections.singletonMap("f2", Arrays.asList(3))); - List deleted = - new ArrayList<>(commitMessage1.newFilesIncrement().newIndexFiles()); - deleted.addAll(commitMessage2.newFilesIncrement().newIndexFiles()); - CommitMessage commitMessage4 = store.removeIndexFiles(BinaryRow.EMPTY_ROW, 0, deleted); - store.commit(commitMessage3, commitMessage4); + BinaryRow.EMPTY_ROW, + Collections.singletonMap("f2", Arrays.asList(3)), + bitmap64); + store.commit(commitMessage3); // assert 2 - assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW, 0).size()).isEqualTo(1); - maintainer = store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0); - dvs = maintainer.deletionVectors(); + assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW, 0).size()).isEqualTo(2); + dvs = store.deletionVectors(BinaryRow.EMPTY_ROW, 0); assertThat(dvs.size()).isEqualTo(2); assertThat(dvs.get("f1").isDeleted(3)).isTrue(); assertThat(dvs.get("f2").isDeleted(3)).isTrue(); diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala index 68e741fb13c1..e3bdf4614bdf 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala @@ -19,7 +19,8 @@ package org.apache.paimon.spark.sql import org.apache.paimon.data.BinaryRow -import org.apache.paimon.deletionvectors.{BucketedDvMaintainer, BucketedDvMaintainerTest, DeletionVector} +import org.apache.paimon.deletionvectors.DeletionVector +import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX import org.apache.paimon.fs.Path import org.apache.paimon.spark.{PaimonSparkTestBase, PaimonSplitScan} import org.apache.paimon.spark.schema.PaimonMetadataColumn @@ -36,6 +37,7 @@ import org.apache.spark.sql.util.QueryExecutionListener import org.junit.jupiter.api.Assertions import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.Random @@ -116,8 +118,6 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe "INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (3, 30, 'c3'), (4, 40, 'c4'), (5, 50, 'c5')") val table = loadTable("target") - val dvMaintainerFactory = - BucketedDvMaintainer.factory(table.store().newIndexFileHandler()) runAndCheckSplitScan(s""" |MERGE INTO target |USING source @@ -141,7 +141,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe 700, "c77") :: Row(9, 990, "c99") :: Nil ) - val deletionVectors = getAllLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors = getLatestDeletionVectors(table) Assertions.assertTrue(deletionVectors.nonEmpty) } } @@ -166,11 +166,9 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe |""".stripMargin) val table = loadTable("T") - val dvMaintainerFactory = - BucketedDvMaintainer.factory(table.store().newIndexFileHandler()) spark.sql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')") - val deletionVectors1 = getAllLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors1 = getLatestDeletionVectors(table) Assertions.assertEquals(0, deletionVectors1.size) val cond1 = "id = 2" @@ -179,7 +177,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe checkAnswer( spark.sql(s"SELECT * from T ORDER BY id"), Row(1, "a") :: Row(2, "b_2") :: Row(3, "c") :: Nil) - val deletionVectors2 = getAllLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors2 = getLatestDeletionVectors(table) Assertions.assertEquals(1, deletionVectors2.size) deletionVectors2 .foreach { @@ -191,7 +189,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe checkAnswer( spark.sql(s"SELECT * from T ORDER BY id"), Row(1, "a") :: Row(2, "b_2") :: Row(3, "c") :: Row(4, "d") :: Row(5, "e") :: Nil) - val deletionVectors3 = getAllLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors3 = getLatestDeletionVectors(table) Assertions.assertTrue(deletionVectors2 == deletionVectors3) val cond2 = "id % 2 = 1" @@ -208,7 +206,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe "_all") :: Nil) spark.sql("CALL sys.compact('T')") - val deletionVectors4 = getAllLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors4 = getLatestDeletionVectors(table) // After compaction, deletionVectors should be empty Assertions.assertTrue(deletionVectors4.isEmpty) checkAnswer( @@ -238,12 +236,10 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe |""".stripMargin) val table = loadTable("T") - val dvMaintainerFactory = - BucketedDvMaintainer.factory(table.store().newIndexFileHandler()) spark.sql( "INSERT INTO T VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), (4, 'd', '2025')") - val deletionVectors1 = getAllLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors1 = getLatestDeletionVectors(table) Assertions.assertEquals(0, deletionVectors1.size) val cond1 = "id = 2" @@ -258,7 +254,6 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe val deletionVectors2 = getLatestDeletionVectors( table, - dvMaintainerFactory, Seq(BinaryRow.singleColumn("2024"), BinaryRow.singleColumn("2025"))) Assertions.assertEquals(1, deletionVectors2.size) deletionVectors2 @@ -277,15 +272,11 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe "d_2", "2025") :: Nil) val deletionVectors3 = - getLatestDeletionVectors( - table, - dvMaintainerFactory, - Seq(BinaryRow.singleColumn("2024"))) + getLatestDeletionVectors(table, Seq(BinaryRow.singleColumn("2024"))) Assertions.assertTrue(deletionVectors2 == deletionVectors3) val deletionVectors4 = getLatestDeletionVectors( table, - dvMaintainerFactory, Seq(BinaryRow.singleColumn("2024"), BinaryRow.singleColumn("2025"))) deletionVectors4 .foreach { @@ -294,7 +285,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe } spark.sql("CALL sys.compact('T')") - val deletionVectors5 = getAllLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors5 = getLatestDeletionVectors(table) // After compaction, deletionVectors should be empty Assertions.assertTrue(deletionVectors5.isEmpty) checkAnswer( @@ -325,18 +316,16 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe |""".stripMargin) val table = loadTable("T") - val dvMaintainerFactory = - BucketedDvMaintainer.factory(table.store().newIndexFileHandler()) spark.sql("INSERT INTO T VALUES (1, 'a'), (2, 'b')") - val deletionVectors1 = getAllLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors1 = getLatestDeletionVectors(table) Assertions.assertEquals(0, deletionVectors1.size) val cond1 = "id = 2" val rowMetaInfo1 = getFilePathAndRowIndex(cond1) runAndCheckSplitScan(s"DELETE FROM T WHERE $cond1") checkAnswer(spark.sql(s"SELECT * from T ORDER BY id"), Row(1, "a") :: Nil) - val deletionVectors2 = getAllLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors2 = getLatestDeletionVectors(table) Assertions.assertEquals(1, deletionVectors2.size) deletionVectors2 .foreach { @@ -348,7 +337,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe checkAnswer( spark.sql(s"SELECT * from T ORDER BY id"), Row(1, "a") :: Row(2, "bb") :: Row(3, "c") :: Row(4, "d") :: Nil) - val deletionVectors3 = getAllLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors3 = getLatestDeletionVectors(table) Assertions.assertTrue(deletionVectors2 == deletionVectors3) val cond2 = "id % 2 = 1" @@ -356,7 +345,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe checkAnswer(spark.sql(s"SELECT * from T ORDER BY id"), Row(2, "bb") :: Row(4, "d") :: Nil) spark.sql("CALL sys.compact('T')") - val deletionVectors4 = getAllLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors4 = getLatestDeletionVectors(table) // After compaction, deletionVectors should be empty Assertions.assertTrue(deletionVectors4.isEmpty) checkAnswer(spark.sql(s"SELECT * from T ORDER BY id"), Row(2, "bb") :: Row(4, "d") :: Nil) @@ -382,19 +371,14 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe |""".stripMargin) val table = loadTable("T") - val dvMaintainerFactory = - BucketedDvMaintainer.factory(table.store().newIndexFileHandler()) def getDeletionVectors(ptValues: Seq[String]): Map[String, DeletionVector] = { - getLatestDeletionVectors( - table, - dvMaintainerFactory, - ptValues.map(BinaryRow.singleColumn)) + getLatestDeletionVectors(table, ptValues.map(BinaryRow.singleColumn)) } spark.sql( "INSERT INTO T VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), (4, 'd', '2025')") - val deletionVectors1 = getAllLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors1 = getLatestDeletionVectors(table) Assertions.assertEquals(0, deletionVectors1.size) val cond1 = "id = 2" @@ -466,10 +450,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe spark.sql(s"SELECT * from T ORDER BY id"), Row(1, "a_new1") :: Row(2, "b") :: Row(3, "c_new1") :: Nil) - val dvMaintainerFactory = - BucketedDvMaintainer.factory(table.store().newIndexFileHandler()) - - val deletionVectors1 = getAllLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors1 = getLatestDeletionVectors(table) // 1, 3 deleted, their row positions are 0, 2 Assertions.assertEquals(1, deletionVectors1.size) deletionVectors1 @@ -482,7 +463,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe // Compact // f3 (1, 2, 3), no deletion spark.sql("CALL sys.compact('T')") - val deletionVectors2 = getAllLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors2 = getLatestDeletionVectors(table) // After compaction, deletionVectors should be empty Assertions.assertTrue(deletionVectors2.isEmpty) @@ -494,7 +475,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe spark.sql(s"SELECT * from T ORDER BY id"), Row(1, "a_new1") :: Row(2, "b_new2") :: Row(3, "c_new1") :: Nil) - val deletionVectors3 = getAllLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors3 = getLatestDeletionVectors(table) // 2 deleted, row positions is 1 Assertions.assertEquals(1, deletionVectors3.size) deletionVectors3 @@ -703,27 +684,31 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe new Path(path).getName } - private def getAllLatestDeletionVectors( - table: FileStoreTable, - dvMaintainerFactory: BucketedDvMaintainer.Factory): Map[String, DeletionVector] = { - getLatestDeletionVectors(table, dvMaintainerFactory, Seq(BinaryRow.EMPTY_ROW)) - } - private def getLatestDeletionVectors( table: FileStoreTable, - dvMaintainerFactory: BucketedDvMaintainer.Factory, - partitions: Seq[BinaryRow]): Map[String, DeletionVector] = { - partitions.flatMap { - partition => - BucketedDvMaintainerTest - .createOrRestore( - dvMaintainerFactory, - table.snapshotManager().latestSnapshot(), - partition - ) - .deletionVectors() - .asScala - }.toMap + partitions: Seq[BinaryRow] = Seq(BinaryRow.EMPTY_ROW)): Map[String, DeletionVector] = { + val bucket = table.coreOptions().bucket() + val indexFileHandler = table.store().newIndexFileHandler() + val entries = indexFileHandler + .scan(table.snapshotManager().latestSnapshot(), DELETION_VECTORS_INDEX) + .asScala + + val res = mutable.Map[String, DeletionVector]() + partitions.foreach( + partition => { + val fileMetas = + entries.filter(_.partition().equals(partition)).map(_.indexFile()).toList.asJava + if (bucket == -1) { + res ++= indexFileHandler.readAllDeletionVectors(partition, 0, fileMetas).asScala + } else { + for (i <- 0 until bucket) { + res ++= indexFileHandler + .readAllDeletionVectors(partition, i, fileMetas) + .asScala + } + } + }) + res.toMap } private def getFilePathAndRowIndex(condition: String): Map[String, Array[Long]] = {