Skip to content

Commit 76306db

Browse files
Core: Add snapshotTransformer API for TableMetadata
1 parent e268df6 commit 76306db

File tree

4 files changed

+279
-22
lines changed

4 files changed

+279
-22
lines changed

core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,8 @@ public static TableMetadata replacePaths(
136136
metadata.partitionStatisticsFiles(), sourcePrefix, targetPrefix),
137137
metadata.nextRowId(),
138138
metadata.encryptionKeys(),
139-
metadata.changes());
139+
metadata.changes(),
140+
null);
140141
}
141142

142143
private static Map<String, String> updateProperties(

core/src/main/java/org/apache/iceberg/TableMetadata.java

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ public String toString() {
263263
private final List<MetadataUpdate> changes;
264264
private final long nextRowId;
265265
private final List<EncryptedKey> encryptionKeys;
266+
private final Function<Snapshot, Snapshot> snapshotTransformer;
266267
private SerializableSupplier<List<Snapshot>> snapshotsSupplier;
267268
private volatile List<Snapshot> snapshots;
268269
private volatile Map<Long, Snapshot> snapshotsById;
@@ -296,7 +297,8 @@ public String toString() {
296297
List<PartitionStatisticsFile> partitionStatisticsFiles,
297298
long nextRowId,
298299
List<EncryptedKey> encryptionKeys,
299-
List<MetadataUpdate> changes) {
300+
List<MetadataUpdate> changes,
301+
Function<Snapshot, Snapshot> snapshotTransformer) {
300302
Preconditions.checkArgument(
301303
specs != null && !specs.isEmpty(), "Partition specs cannot be null or empty");
302304
Preconditions.checkArgument(
@@ -333,7 +335,9 @@ public String toString() {
333335
this.sortOrders = sortOrders;
334336
this.properties = properties;
335337
this.currentSnapshotId = currentSnapshotId;
336-
this.snapshots = snapshots;
338+
this.snapshotTransformer =
339+
snapshotTransformer != null ? snapshotTransformer : Function.identity();
340+
this.snapshots = snapshots != null ? applySnapshotTransformer(snapshots) : null;
337341
this.snapshotsSupplier = snapshotsSupplier;
338342
this.snapshotsLoaded = snapshotsSupplier == null;
339343
this.snapshotLog = snapshotLog;
@@ -343,11 +347,15 @@ public String toString() {
343347
// changes are carried through until metadata is read from a file
344348
this.changes = changes;
345349

346-
this.snapshotsById = indexAndValidateSnapshots(snapshots, lastSequenceNumber);
350+
this.snapshotsById =
351+
this.snapshots != null
352+
? indexAndValidateSnapshots(this.snapshots, lastSequenceNumber)
353+
: ImmutableMap.of();
347354
this.schemasById = indexSchemas();
348355
this.specsById = PartitionUtil.indexSpecs(specs);
349356
this.sortOrdersById = indexSortOrders(sortOrders);
350-
this.refs = validateRefs(currentSnapshotId, refs, snapshotsById);
357+
this.refs =
358+
this.snapshots != null ? validateRefs(currentSnapshotId, refs, snapshotsById) : refs;
351359
this.statisticsFiles = ImmutableList.copyOf(statisticsFiles);
352360
this.partitionStatisticsFiles = ImmutableList.copyOf(partitionStatisticsFiles);
353361

@@ -395,7 +403,9 @@ public String toString() {
395403
previous.timestampMillis);
396404
}
397405

398-
validateCurrentSnapshot();
406+
if (this.snapshots != null) {
407+
validateCurrentSnapshot();
408+
}
399409
}
400410

401411
public int formatVersion() {
@@ -533,7 +543,9 @@ private synchronized void ensureSnapshotsLoaded() {
533543
List<Snapshot> loadedSnapshots = Lists.newArrayList(snapshotsSupplier.get());
534544
loadedSnapshots.removeIf(s -> s.sequenceNumber() > lastSequenceNumber);
535545

536-
this.snapshots = ImmutableList.copyOf(loadedSnapshots);
546+
List<Snapshot> transformedSnapshots = applySnapshotTransformer(loadedSnapshots);
547+
548+
this.snapshots = ImmutableList.copyOf(transformedSnapshots);
537549
this.snapshotsById = indexAndValidateSnapshots(snapshots, lastSequenceNumber);
538550
validateCurrentSnapshot();
539551

@@ -609,6 +621,14 @@ public TableMetadata removeSnapshotsIf(Predicate<Snapshot> removeIf) {
609621
return new Builder(this).removeSnapshots(toRemove).build();
610622
}
611623

624+
public TableMetadata transformSnapshots(Function<Snapshot, Snapshot> transformer) {
625+
return new Builder(this).transformSnapshots(transformer).build();
626+
}
627+
628+
private List<Snapshot> applySnapshotTransformer(List<Snapshot> snapshotList) {
629+
return snapshotList.stream().map(snapshotTransformer).collect(Collectors.toList());
630+
}
631+
612632
public TableMetadata replaceProperties(Map<String, String> rawProperties) {
613633
ValidationException.check(rawProperties != null, "Cannot set properties to null");
614634
Map<String, String> newProperties = unreservedProperties(rawProperties);
@@ -916,6 +936,7 @@ public static class Builder {
916936
private boolean suppressHistoricalSnapshots = false;
917937
private long nextRowId;
918938
private final List<EncryptedKey> encryptionKeys;
939+
private Function<Snapshot, Snapshot> snapshotTransformer = Function.identity();
919940

920941
// change tracking
921942
private final List<MetadataUpdate> changes;
@@ -1516,6 +1537,12 @@ public Builder discardChanges() {
15161537
return this;
15171538
}
15181539

1540+
public Builder transformSnapshots(Function<Snapshot, Snapshot> transformer) {
1541+
Preconditions.checkArgument(transformer != null, "Snapshot transformer cannot be null");
1542+
this.snapshotTransformer = transformer;
1543+
return this;
1544+
}
1545+
15191546
public Builder setPreviousFileLocation(String previousFileLocation) {
15201547
this.previousFileLocation = previousFileLocation;
15211548
return this;
@@ -1526,7 +1553,8 @@ private boolean hasChanges() {
15261553
|| (discardChanges && !changes.isEmpty())
15271554
|| metadataLocation != null
15281555
|| suppressHistoricalSnapshots
1529-
|| null != snapshotsSupplier;
1556+
|| null != snapshotsSupplier
1557+
|| !snapshotTransformer.equals(Function.identity());
15301558
}
15311559

15321560
public TableMetadata build() {
@@ -1590,7 +1618,8 @@ public TableMetadata build() {
15901618
.collect(Collectors.toList()),
15911619
nextRowId,
15921620
encryptionKeys,
1593-
discardChanges ? ImmutableList.of() : ImmutableList.copyOf(changes));
1621+
discardChanges ? ImmutableList.of() : ImmutableList.copyOf(changes),
1622+
snapshotTransformer);
15941623
}
15951624

15961625
private int addSchemaInternal(Schema schema, int newLastColumnId) {

core/src/main/java/org/apache/iceberg/TableMetadataParser.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -580,7 +580,8 @@ public static TableMetadata fromJson(String metadataLocation, JsonNode node) {
580580
partitionStatisticsFiles,
581581
lastRowId,
582582
keys,
583-
ImmutableList.of() /* no changes from the file */);
583+
ImmutableList.of() /* no changes from the file */,
584+
null /* no snapshot transformer */);
584585
}
585586

586587
private static Map<String, SnapshotRef> refsFromJson(JsonNode refMap) {

0 commit comments

Comments
 (0)