Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ public static TableMetadata replacePaths(
metadata.partitionStatisticsFiles(), sourcePrefix, targetPrefix),
metadata.nextRowId(),
metadata.encryptionKeys(),
metadata.changes());
metadata.changes(),
null);
}

private static Map<String, String> updateProperties(
Expand Down
45 changes: 37 additions & 8 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ public String toString() {
private final List<MetadataUpdate> changes;
private final long nextRowId;
private final List<EncryptedKey> encryptionKeys;
private final transient Function<Snapshot, Snapshot> snapshotTransformer;
private SerializableSupplier<List<Snapshot>> snapshotsSupplier;
private volatile List<Snapshot> snapshots;
private volatile Map<Long, Snapshot> snapshotsById;
Expand Down Expand Up @@ -296,7 +297,8 @@ public String toString() {
List<PartitionStatisticsFile> partitionStatisticsFiles,
long nextRowId,
List<EncryptedKey> encryptionKeys,
List<MetadataUpdate> changes) {
List<MetadataUpdate> changes,
Function<Snapshot, Snapshot> snapshotTransformer) {
Preconditions.checkArgument(
specs != null && !specs.isEmpty(), "Partition specs cannot be null or empty");
Preconditions.checkArgument(
Expand Down Expand Up @@ -333,7 +335,9 @@ public String toString() {
this.sortOrders = sortOrders;
this.properties = properties;
this.currentSnapshotId = currentSnapshotId;
this.snapshots = snapshots;
this.snapshotTransformer =
snapshotTransformer != null ? snapshotTransformer : Function.identity();
this.snapshots = snapshots != null ? applySnapshotTransformer(snapshots) : null;
this.snapshotsSupplier = snapshotsSupplier;
this.snapshotsLoaded = snapshotsSupplier == null;
this.snapshotLog = snapshotLog;
Expand All @@ -343,11 +347,15 @@ public String toString() {
// changes are carried through until metadata is read from a file
this.changes = changes;

this.snapshotsById = indexAndValidateSnapshots(snapshots, lastSequenceNumber);
this.snapshotsById =
this.snapshots != null
? indexAndValidateSnapshots(this.snapshots, lastSequenceNumber)
: ImmutableMap.of();
this.schemasById = indexSchemas();
this.specsById = PartitionUtil.indexSpecs(specs);
this.sortOrdersById = indexSortOrders(sortOrders);
this.refs = validateRefs(currentSnapshotId, refs, snapshotsById);
this.refs =
this.snapshots != null ? validateRefs(currentSnapshotId, refs, snapshotsById) : refs;
this.statisticsFiles = ImmutableList.copyOf(statisticsFiles);
this.partitionStatisticsFiles = ImmutableList.copyOf(partitionStatisticsFiles);

Expand Down Expand Up @@ -395,7 +403,9 @@ public String toString() {
previous.timestampMillis);
}

validateCurrentSnapshot();
if (this.snapshots != null) {
validateCurrentSnapshot();
}
}

public int formatVersion() {
Expand Down Expand Up @@ -533,7 +543,9 @@ private synchronized void ensureSnapshotsLoaded() {
List<Snapshot> loadedSnapshots = Lists.newArrayList(snapshotsSupplier.get());
loadedSnapshots.removeIf(s -> s.sequenceNumber() > lastSequenceNumber);

this.snapshots = ImmutableList.copyOf(loadedSnapshots);
List<Snapshot> transformedSnapshots = applySnapshotTransformer(loadedSnapshots);

this.snapshots = ImmutableList.copyOf(transformedSnapshots);
this.snapshotsById = indexAndValidateSnapshots(snapshots, lastSequenceNumber);
validateCurrentSnapshot();

Expand Down Expand Up @@ -609,6 +621,14 @@ public TableMetadata removeSnapshotsIf(Predicate<Snapshot> removeIf) {
return new Builder(this).removeSnapshots(toRemove).build();
}

public TableMetadata transformSnapshots(Function<Snapshot, Snapshot> transformer) {
return new Builder(this).transformSnapshots(transformer).build();
}

private List<Snapshot> applySnapshotTransformer(List<Snapshot> snapshotList) {
return snapshotList.stream().map(snapshotTransformer).collect(Collectors.toList());
}

public TableMetadata replaceProperties(Map<String, String> rawProperties) {
ValidationException.check(rawProperties != null, "Cannot set properties to null");
Map<String, String> newProperties = unreservedProperties(rawProperties);
Expand Down Expand Up @@ -916,6 +936,7 @@ public static class Builder {
private boolean suppressHistoricalSnapshots = false;
private long nextRowId;
private final List<EncryptedKey> encryptionKeys;
private Function<Snapshot, Snapshot> snapshotTransformer = Function.identity();

// change tracking
private final List<MetadataUpdate> changes;
Expand Down Expand Up @@ -1516,6 +1537,12 @@ public Builder discardChanges() {
return this;
}

public Builder transformSnapshots(Function<Snapshot, Snapshot> transformer) {
Preconditions.checkArgument(transformer != null, "Snapshot transformer cannot be null");
this.snapshotTransformer = transformer;
return this;
}

public Builder setPreviousFileLocation(String previousFileLocation) {
this.previousFileLocation = previousFileLocation;
return this;
Expand All @@ -1526,7 +1553,8 @@ private boolean hasChanges() {
|| (discardChanges && !changes.isEmpty())
|| metadataLocation != null
|| suppressHistoricalSnapshots
|| null != snapshotsSupplier;
|| null != snapshotsSupplier
|| !snapshotTransformer.equals(Function.identity());
}

public TableMetadata build() {
Expand Down Expand Up @@ -1590,7 +1618,8 @@ public TableMetadata build() {
.collect(Collectors.toList()),
nextRowId,
encryptionKeys,
discardChanges ? ImmutableList.of() : ImmutableList.copyOf(changes));
discardChanges ? ImmutableList.of() : ImmutableList.copyOf(changes),
snapshotTransformer);
}

private int addSchemaInternal(Schema schema, int newLastColumnId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,8 @@ public static TableMetadata fromJson(String metadataLocation, JsonNode node) {
partitionStatisticsFiles,
lastRowId,
keys,
ImmutableList.of() /* no changes from the file */);
ImmutableList.of() /* no changes from the file */,
null /* no snapshot transformer */);
}

private static Map<String, SnapshotRef> refsFromJson(JsonNode refMap) {
Expand Down
Loading