Skip to content

Commit

Permalink
Keep the existing API and add snapshotManifestPairs() method
Browse files Browse the repository at this point in the history
  • Loading branch information
hsiang-c committed Sep 18, 2024
1 parent 03c45d2 commit 0814a8b
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 87 deletions.
72 changes: 0 additions & 72 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1136,78 +1136,6 @@ acceptedBreaks:
new: "method org.apache.iceberg.BaseMetastoreOperations.CommitStatus org.apache.iceberg.BaseMetastoreTableOperations::checkCommitStatus(java.lang.String,\
\ org.apache.iceberg.TableMetadata)"
justification: "Removing deprecated code"
- code: "java.method.returnTypeTypeParametersChanged"
old: "method org.apache.iceberg.io.CloseableIterable<org.apache.iceberg.ManifestFile>\
\ org.apache.iceberg.AllDataFilesTable.AllDataFilesTableScan::manifests()"
new: "method org.apache.iceberg.io.CloseableIterable<org.apache.iceberg.util.Pair<org.apache.iceberg.Snapshot,\
\ org.apache.iceberg.ManifestFile>> org.apache.iceberg.AllDataFilesTable.AllDataFilesTableScan::manifests()"
justification: "Track manifests per snapshot"
- code: "java.method.returnTypeTypeParametersChanged"
old: "method org.apache.iceberg.io.CloseableIterable<org.apache.iceberg.ManifestFile>\
\ org.apache.iceberg.AllDeleteFilesTable.AllDeleteFilesTableScan::manifests()"
new: "method org.apache.iceberg.io.CloseableIterable<org.apache.iceberg.util.Pair<org.apache.iceberg.Snapshot,\
\ org.apache.iceberg.ManifestFile>> org.apache.iceberg.AllDeleteFilesTable.AllDeleteFilesTableScan::manifests()"
justification: "Track manifests per snapshot"
- code: "java.method.returnTypeTypeParametersChanged"
old: "method org.apache.iceberg.io.CloseableIterable<org.apache.iceberg.ManifestFile>\
\ org.apache.iceberg.AllFilesTable.AllFilesTableScan::manifests()"
new: "method org.apache.iceberg.io.CloseableIterable<org.apache.iceberg.util.Pair<org.apache.iceberg.Snapshot,\
\ org.apache.iceberg.ManifestFile>> org.apache.iceberg.AllFilesTable.AllFilesTableScan::manifests()"
justification: "Track manifests per snapshot"
- code: "java.method.returnTypeTypeParametersChanged"
old: "method org.apache.iceberg.io.CloseableIterable<org.apache.iceberg.ManifestFile>\
\ org.apache.iceberg.BaseAllMetadataTableScan::reachableManifests(org.apache.iceberg.relocated.com.google.common.base.Function<org.apache.iceberg.Snapshot,\
\ java.lang.Iterable<org.apache.iceberg.ManifestFile>>) @ org.apache.iceberg.AllDataFilesTable.AllDataFilesTableScan"
new: "method org.apache.iceberg.io.CloseableIterable<org.apache.iceberg.util.Pair<org.apache.iceberg.Snapshot,\
\ org.apache.iceberg.ManifestFile>> org.apache.iceberg.BaseAllMetadataTableScan::reachableManifests(org.apache.iceberg.relocated.com.google.common.base.Function<org.apache.iceberg.Snapshot,\
\ java.lang.Iterable<org.apache.iceberg.util.Pair<org.apache.iceberg.Snapshot,\
\ org.apache.iceberg.ManifestFile>>>) @ org.apache.iceberg.AllDataFilesTable.AllDataFilesTableScan"
justification: "Track manifests per snapshot"
- code: "java.method.returnTypeTypeParametersChanged"
old: "method org.apache.iceberg.io.CloseableIterable<org.apache.iceberg.ManifestFile>\
\ org.apache.iceberg.BaseAllMetadataTableScan::reachableManifests(org.apache.iceberg.relocated.com.google.common.base.Function<org.apache.iceberg.Snapshot,\
\ java.lang.Iterable<org.apache.iceberg.ManifestFile>>) @ org.apache.iceberg.AllDeleteFilesTable.AllDeleteFilesTableScan"
new: "method org.apache.iceberg.io.CloseableIterable<org.apache.iceberg.util.Pair<org.apache.iceberg.Snapshot,\
\ org.apache.iceberg.ManifestFile>> org.apache.iceberg.BaseAllMetadataTableScan::reachableManifests(org.apache.iceberg.relocated.com.google.common.base.Function<org.apache.iceberg.Snapshot,\
\ java.lang.Iterable<org.apache.iceberg.util.Pair<org.apache.iceberg.Snapshot,\
\ org.apache.iceberg.ManifestFile>>>) @ org.apache.iceberg.AllDeleteFilesTable.AllDeleteFilesTableScan"
justification: "Track manifests per snapshot"
- code: "java.method.returnTypeTypeParametersChanged"
old: "method org.apache.iceberg.io.CloseableIterable<org.apache.iceberg.ManifestFile>\
\ org.apache.iceberg.BaseAllMetadataTableScan::reachableManifests(org.apache.iceberg.relocated.com.google.common.base.Function<org.apache.iceberg.Snapshot,\
\ java.lang.Iterable<org.apache.iceberg.ManifestFile>>) @ org.apache.iceberg.AllFilesTable.AllFilesTableScan"
new: "method org.apache.iceberg.io.CloseableIterable<org.apache.iceberg.util.Pair<org.apache.iceberg.Snapshot,\
\ org.apache.iceberg.ManifestFile>> org.apache.iceberg.BaseAllMetadataTableScan::reachableManifests(org.apache.iceberg.relocated.com.google.common.base.Function<org.apache.iceberg.Snapshot,\
\ java.lang.Iterable<org.apache.iceberg.util.Pair<org.apache.iceberg.Snapshot,\
\ org.apache.iceberg.ManifestFile>>>) @ org.apache.iceberg.AllFilesTable.AllFilesTableScan"
justification: "Track manifests per snapshot"
- code: "java.method.returnTypeTypeParametersChanged"
old: "method org.apache.iceberg.io.CloseableIterable<org.apache.iceberg.ManifestFile>\
\ org.apache.iceberg.BaseAllMetadataTableScan::reachableManifests(org.apache.iceberg.relocated.com.google.common.base.Function<org.apache.iceberg.Snapshot,\
\ java.lang.Iterable<org.apache.iceberg.ManifestFile>>) @ org.apache.iceberg.AllManifestsTable.AllManifestsTableScan"
new: "method org.apache.iceberg.io.CloseableIterable<org.apache.iceberg.util.Pair<org.apache.iceberg.Snapshot,\
\ org.apache.iceberg.ManifestFile>> org.apache.iceberg.BaseAllMetadataTableScan::reachableManifests(org.apache.iceberg.relocated.com.google.common.base.Function<org.apache.iceberg.Snapshot,\
\ java.lang.Iterable<org.apache.iceberg.util.Pair<org.apache.iceberg.Snapshot,\
\ org.apache.iceberg.ManifestFile>>>) @ org.apache.iceberg.AllManifestsTable.AllManifestsTableScan"
justification: "Track manifests per snapshot"
- code: "java.method.returnTypeTypeParametersChanged"
old: "method org.apache.iceberg.io.CloseableIterable<org.apache.iceberg.ManifestFile>\
\ org.apache.iceberg.DataFilesTable.DataFilesTableScan::manifests()"
new: "method org.apache.iceberg.io.CloseableIterable<org.apache.iceberg.util.Pair<org.apache.iceberg.Snapshot,\
\ org.apache.iceberg.ManifestFile>> org.apache.iceberg.DataFilesTable.DataFilesTableScan::manifests()"
justification: "Track manifests per snapshot"
- code: "java.method.returnTypeTypeParametersChanged"
old: "method org.apache.iceberg.io.CloseableIterable<org.apache.iceberg.ManifestFile>\
\ org.apache.iceberg.DeleteFilesTable.DeleteFilesTableScan::manifests()"
new: "method org.apache.iceberg.io.CloseableIterable<org.apache.iceberg.util.Pair<org.apache.iceberg.Snapshot,\
\ org.apache.iceberg.ManifestFile>> org.apache.iceberg.DeleteFilesTable.DeleteFilesTableScan::manifests()"
justification: "Track manifests per snapshot"
- code: "java.method.returnTypeTypeParametersChanged"
old: "method org.apache.iceberg.io.CloseableIterable<org.apache.iceberg.ManifestFile>\
\ org.apache.iceberg.FilesTable.FilesTableScan::manifests()"
new: "method org.apache.iceberg.io.CloseableIterable<org.apache.iceberg.util.Pair<org.apache.iceberg.Snapshot,\
\ org.apache.iceberg.ManifestFile>> org.apache.iceberg.FilesTable.FilesTableScan::manifests()"
justification: "Track manifests per snapshot"
apache-iceberg-0.14.0:
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
Expand Down
9 changes: 7 additions & 2 deletions core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext
}

@Override
protected CloseableIterable<Pair<Snapshot, ManifestFile>> manifests() {
return reachableManifests(
protected CloseableIterable<ManifestFile> manifests() {
return reachableManifests(snapshot -> snapshot.dataManifests(table().io()));
}

@Override
protected CloseableIterable<Pair<Snapshot, ManifestFile>> snapshotManifestPairs() {
return reachableSnapshotManifestPairs(
snapshot ->
snapshot.dataManifests(table().io()).stream()
.map(manifestFile -> Pair.of(snapshot, manifestFile))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext
}

@Override
protected CloseableIterable<Pair<Snapshot, ManifestFile>> manifests() {
return reachableManifests(
protected CloseableIterable<ManifestFile> manifests() {
return reachableManifests(snapshot -> snapshot.deleteManifests(table().io()));
}

@Override
protected CloseableIterable<Pair<Snapshot, ManifestFile>> snapshotManifestPairs() {
return reachableSnapshotManifestPairs(
snapshot ->
snapshot.deleteManifests(table().io()).stream()
.map(manifestFile -> Pair.of(snapshot, manifestFile))
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/AllEntriesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext
@Override
protected CloseableIterable<FileScanTask> doPlanFiles() {
CloseableIterable<Pair<Snapshot, ManifestFile>> snapshotManifestPairs =
reachableManifests(
reachableSnapshotManifestPairs(
snapshot ->
snapshot.allManifests(table().io()).stream()
.map(manifestFile -> Pair.of(snapshot, manifestFile))
Expand Down
9 changes: 7 additions & 2 deletions core/src/main/java/org/apache/iceberg/AllFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext
}

@Override
protected CloseableIterable<Pair<Snapshot, ManifestFile>> manifests() {
return reachableManifests(
protected CloseableIterable<ManifestFile> manifests() {
return reachableManifests(snapshot -> snapshot.allManifests(table().io()));
}

@Override
protected CloseableIterable<Pair<Snapshot, ManifestFile>> snapshotManifestPairs() {
return reachableSnapshotManifestPairs(
snapshot ->
snapshot.allManifests(table().io()).stream()
.map(manifestFile -> Pair.of(snapshot, manifestFile))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,21 @@ public CloseableIterable<FileScanTask> planFiles() {
return doPlanFiles();
}

protected CloseableIterable<Pair<Snapshot, ManifestFile>> reachableManifests(
protected CloseableIterable<ManifestFile> reachableManifests(
Function<Snapshot, Iterable<ManifestFile>> toManifests) {
Iterable<Snapshot> snapshots = table().snapshots();
Iterable<Iterable<ManifestFile>> manifestIterables =
Iterables.transform(snapshots, toManifests);

try (CloseableIterable<ManifestFile> iterable =
new ParallelIterable<>(manifestIterables, planExecutor())) {
return CloseableIterable.withNoopClose(Sets.newHashSet(iterable));
} catch (IOException e) {
throw new UncheckedIOException("Failed to close parallel iterable", e);
}
}

protected CloseableIterable<Pair<Snapshot, ManifestFile>> reachableSnapshotManifestPairs(
Function<Snapshot, Iterable<Pair<Snapshot, ManifestFile>>> toManifests) {
Iterable<Snapshot> snapshots = table().snapshots();
Iterable<Iterable<Pair<Snapshot, ManifestFile>>> manifestIterables =
Expand Down
22 changes: 18 additions & 4 deletions core/src/main/java/org/apache/iceberg/BaseFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,18 @@ protected BaseFilesTableScan(
}

/** Returns an iterable of manifest files to explore for this files metadata table scan */
protected abstract CloseableIterable<Pair<Snapshot, ManifestFile>> manifests();
protected abstract CloseableIterable<ManifestFile> manifests();

/**
* Returns an iterable of (reference_snapshot, manifest file) pairs to explore for this files
* metadata table scan
*/
protected abstract CloseableIterable<Pair<Snapshot, ManifestFile>> snapshotManifestPairs();

@Override
protected CloseableIterable<FileScanTask> doPlanFiles() {
return BaseFilesTable.planFiles(table(), manifests(), tableSchema(), schema(), context());
return BaseFilesTable.planFiles(
table(), snapshotManifestPairs(), tableSchema(), schema(), context());
}
}

Expand All @@ -120,11 +127,18 @@ protected BaseAllFilesTableScan(
}

/** Returns an iterable of manifest files to explore for this all files metadata table scan */
protected abstract CloseableIterable<Pair<Snapshot, ManifestFile>> manifests();
protected abstract CloseableIterable<ManifestFile> manifests();

/**
* Returns an iterable of (reference_snapshot, manifest file) pairs to explore for this all
* files metadata table scan
*/
protected abstract CloseableIterable<Pair<Snapshot, ManifestFile>> snapshotManifestPairs();

@Override
protected CloseableIterable<FileScanTask> doPlanFiles() {
return BaseFilesTable.planFiles(table(), manifests(), tableSchema(), schema(), context());
return BaseFilesTable.planFiles(
table(), snapshotManifestPairs(), tableSchema(), schema(), context());
}
}

Expand Down
7 changes: 6 additions & 1 deletion core/src/main/java/org/apache/iceberg/DataFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext
}

@Override
protected CloseableIterable<Pair<Snapshot, ManifestFile>> manifests() {
protected CloseableIterable<ManifestFile> manifests() {
return CloseableIterable.withNoopClose(snapshot().dataManifests(table().io()));
}

@Override
protected CloseableIterable<Pair<Snapshot, ManifestFile>> snapshotManifestPairs() {
return CloseableIterable.withNoopClose(
snapshot().dataManifests(table().io()).stream()
.map(manifestFile -> Pair.of(snapshot(), manifestFile))
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/java/org/apache/iceberg/DeleteFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext
}

@Override
protected CloseableIterable<Pair<Snapshot, ManifestFile>> manifests() {
protected CloseableIterable<ManifestFile> manifests() {
return CloseableIterable.withNoopClose(snapshot().deleteManifests(table().io()));
}

@Override
protected CloseableIterable<Pair<Snapshot, ManifestFile>> snapshotManifestPairs() {
return CloseableIterable.withNoopClose(
snapshot().deleteManifests(table().io()).stream()
.map(manifestFile -> Pair.of(snapshot(), manifestFile))
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/java/org/apache/iceberg/FilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext
}

@Override
protected CloseableIterable<Pair<Snapshot, ManifestFile>> manifests() {
protected CloseableIterable<ManifestFile> manifests() {
return CloseableIterable.withNoopClose(snapshot().allManifests(table().io()));
}

@Override
protected CloseableIterable<Pair<Snapshot, ManifestFile>> snapshotManifestPairs() {
return CloseableIterable.withNoopClose(
snapshot().allManifests(table().io()).stream()
.map(manifestFile -> Pair.of(snapshot(), manifestFile))
Expand Down

0 comments on commit 0814a8b

Please sign in to comment.