Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/FileMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public Builder copy(DeleteFile toCopy) {
this.keyMetadata =
toCopy.keyMetadata() == null ? null : ByteBuffers.copy(toCopy.keyMetadata());
this.sortOrderId = toCopy.sortOrderId();
this.splitOffsets = toCopy.splitOffsets();
// Preserve DV-specific fields for deletion vectors
this.referencedDataFile = toCopy.referencedDataFile();
this.contentOffset = toCopy.contentOffset();
this.contentSizeInBytes = toCopy.contentSizeInBytes();
return this;
}

Expand Down
129 changes: 118 additions & 11 deletions core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand All @@ -40,6 +41,11 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinCompressionCodec;
import org.apache.iceberg.puffin.PuffinReader;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -446,16 +452,8 @@ private static RewriteResult<DeleteFile> writeDeleteFileEntry(

switch (file.content()) {
case POSITION_DELETES:
String targetDeleteFilePath = newPath(file.location(), sourcePrefix, targetPrefix);
Metrics metricsWithTargetPath =
ContentFileUtil.replacePathBounds(file, sourcePrefix, targetPrefix);
DeleteFile movedFile =
FileMetadata.deleteFileBuilder(spec)
.copy(file)
.withPath(targetDeleteFilePath)
.withMetrics(metricsWithTargetPath)
.build();
appendEntryWithFile(entry, writer, movedFile);
DeleteFile posDeleteFile = newPositionDeleteEntry(file, spec, sourcePrefix, targetPrefix);
appendEntryWithFile(entry, writer, posDeleteFile);
// keep the following entries in metadata but exclude them from copyPlan
// 1) deleted position delete files
// 2) entries not changed by snapshotIds
Expand All @@ -465,7 +463,7 @@ private static RewriteResult<DeleteFile> writeDeleteFileEntry(
.add(
Pair.of(
stagingPath(file.location(), sourcePrefix, stagingLocation),
movedFile.location()));
posDeleteFile.location()));
}
result.toRewrite().add(file);
return result;
Expand Down Expand Up @@ -524,6 +522,56 @@ private static DeleteFile newEqualityDeleteEntry(
.build();
}

private static DeleteFile newPositionDeleteEntry(
DeleteFile file, PartitionSpec spec, String sourcePrefix, String targetPrefix) {
String path = file.location();
Preconditions.checkArgument(
path.startsWith(sourcePrefix),
"Expected delete file %s to start with prefix: %s",
path,
sourcePrefix);

FileMetadata.Builder builder =
FileMetadata.deleteFileBuilder(spec)
.copy(file)
.withPath(newPath(path, sourcePrefix, targetPrefix))
.withMetrics(ContentFileUtil.replacePathBounds(file, sourcePrefix, targetPrefix));

// Update referencedDataFile for DV files
String newReferencedDataFile =
rewriteReferencedDataFilePathForDV(file, sourcePrefix, targetPrefix);
if (newReferencedDataFile != null) {
builder.withReferencedDataFile(newReferencedDataFile);
}

return builder.build();
}

/**
* Replace the referenced data file path for a DV (Deletion Vector) file.
*
* <p>For DV files, returns the updated path with the target prefix. For non-DV files or files
* without a referenced data file, returns null.
*
* @param deleteFile delete file to check
* @param sourcePrefix source prefix that will be replaced
* @param targetPrefix target prefix that will replace it
* @return updated referenced data file path, or null if not applicable
*/
private static String rewriteReferencedDataFilePathForDV(
DeleteFile deleteFile, String sourcePrefix, String targetPrefix) {
if (!ContentFileUtil.isDV(deleteFile) || deleteFile.referencedDataFile() == null) {
return null;
}

String oldReferencedDataFile = deleteFile.referencedDataFile();
if (oldReferencedDataFile.startsWith(sourcePrefix)) {
return newPath(oldReferencedDataFile, sourcePrefix, targetPrefix);
}

return oldReferencedDataFile;
}

/** Class providing engine-specific methods to read and write position delete files. */
public interface PositionDeleteReaderWriter extends Serializable {
CloseableIterable<Record> reader(InputFile inputFile, FileFormat format, PartitionSpec spec);
Expand Down Expand Up @@ -562,6 +610,14 @@ public static void rewritePositionDeleteFile(
throw new UnsupportedOperationException(
String.format("Expected delete file %s to start with prefix: %s", path, sourcePrefix));
}

// DV files (Puffin format for v3+) need special handling to rewrite internal blob metadata
if (ContentFileUtil.isDV(deleteFile)) {
rewriteDVFile(deleteFile, outputFile, io, sourcePrefix, targetPrefix);
return;
}

// For non-DV position delete files (v2), rewrite using the reader/writer
InputFile sourceFile = io.newInputFile(path);
try (CloseableIterable<Record> reader =
posDeleteReaderWriter.reader(sourceFile, deleteFile.format(), spec)) {
Expand Down Expand Up @@ -592,6 +648,57 @@ record = recordIt.next();
}
}

/**
* Rewrite a DV (Deletion Vector) file, updating the referenced data file paths in blob metadata.
*
* @param deleteFile source DV file to be rewritten
* @param outputFile output file to write the rewritten DV to
* @param io file io
* @param sourcePrefix source prefix that will be replaced
* @param targetPrefix target prefix to replace it
*/
private static void rewriteDVFile(
DeleteFile deleteFile,
OutputFile outputFile,
FileIO io,
String sourcePrefix,
String targetPrefix)
throws IOException {
List<Blob> rewrittenBlobs = Lists.newArrayList();
try (PuffinReader reader = Puffin.read(io.newInputFile(deleteFile.location())).build()) {
// Read all blobs and rewrite them with updated referenced data file paths
for (Pair<org.apache.iceberg.puffin.BlobMetadata, ByteBuffer> blobPair :
reader.readAll(reader.fileMetadata().blobs())) {
org.apache.iceberg.puffin.BlobMetadata blobMetadata = blobPair.first();
ByteBuffer blobData = blobPair.second();

// Get the original properties and update the referenced data file path
Map<String, String> properties = Maps.newHashMap(blobMetadata.properties());
String referencedDataFile = properties.get("referenced-data-file");
if (referencedDataFile != null && referencedDataFile.startsWith(sourcePrefix)) {
String newReferencedDataFile = newPath(referencedDataFile, sourcePrefix, targetPrefix);
properties.put("referenced-data-file", newReferencedDataFile);
}

// Create a new blob with updated properties
rewrittenBlobs.add(
new Blob(
blobMetadata.type(),
blobMetadata.inputFields(),
blobMetadata.snapshotId(),
blobMetadata.sequenceNumber(),
blobData,
PuffinCompressionCodec.forName(blobMetadata.compressionCodec()),
properties));
}
}

try (PuffinWriter writer =
Puffin.write(outputFile).createdBy(IcebergBuild.fullVersion()).build()) {
rewrittenBlobs.forEach(writer::write);
}
}

private static PositionDelete newPositionDeleteRecord(
Record record, String sourcePrefix, String targetPrefix) {
PositionDelete delete = PositionDelete.create();
Expand Down
Loading