From 0814a8b65446e7c53455470887fc59d6b73ec099 Mon Sep 17 00:00:00 2001 From: hsiang-c Date: Wed, 18 Sep 2024 12:45:05 +0800 Subject: [PATCH] Keep the existing API and add snapshotManifestPairs() method --- .palantir/revapi.yml | 72 ------------------- .../org/apache/iceberg/AllDataFilesTable.java | 9 ++- .../apache/iceberg/AllDeleteFilesTable.java | 9 ++- .../org/apache/iceberg/AllEntriesTable.java | 2 +- .../org/apache/iceberg/AllFilesTable.java | 9 ++- .../iceberg/BaseAllMetadataTableScan.java | 16 ++++- .../org/apache/iceberg/BaseFilesTable.java | 22 ++++-- .../org/apache/iceberg/DataFilesTable.java | 7 +- .../org/apache/iceberg/DeleteFilesTable.java | 7 +- .../java/org/apache/iceberg/FilesTable.java | 7 +- 10 files changed, 73 insertions(+), 87 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index b059f7321483..9b8017f0beec 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1136,78 +1136,6 @@ acceptedBreaks: new: "method org.apache.iceberg.BaseMetastoreOperations.CommitStatus org.apache.iceberg.BaseMetastoreTableOperations::checkCommitStatus(java.lang.String,\ \ org.apache.iceberg.TableMetadata)" justification: "Removing deprecated code" - - code: "java.method.returnTypeTypeParametersChanged" - old: "method org.apache.iceberg.io.CloseableIterable\ - \ org.apache.iceberg.AllDataFilesTable.AllDataFilesTableScan::manifests()" - new: "method org.apache.iceberg.io.CloseableIterable> org.apache.iceberg.AllDataFilesTable.AllDataFilesTableScan::manifests()" - justification: "Track manifests per snapshot" - - code: "java.method.returnTypeTypeParametersChanged" - old: "method org.apache.iceberg.io.CloseableIterable\ - \ org.apache.iceberg.AllDeleteFilesTable.AllDeleteFilesTableScan::manifests()" - new: "method org.apache.iceberg.io.CloseableIterable> org.apache.iceberg.AllDeleteFilesTable.AllDeleteFilesTableScan::manifests()" - justification: "Track manifests per snapshot" - - code: "java.method.returnTypeTypeParametersChanged" - old: "method org.apache.iceberg.io.CloseableIterable\ - \ org.apache.iceberg.AllFilesTable.AllFilesTableScan::manifests()" - new: "method org.apache.iceberg.io.CloseableIterable> org.apache.iceberg.AllFilesTable.AllFilesTableScan::manifests()" - justification: "Track manifests per snapshot" - - code: "java.method.returnTypeTypeParametersChanged" - old: "method org.apache.iceberg.io.CloseableIterable\ - \ org.apache.iceberg.BaseAllMetadataTableScan::reachableManifests(org.apache.iceberg.relocated.com.google.common.base.Function>) @ org.apache.iceberg.AllDataFilesTable.AllDataFilesTableScan" - new: "method org.apache.iceberg.io.CloseableIterable> org.apache.iceberg.BaseAllMetadataTableScan::reachableManifests(org.apache.iceberg.relocated.com.google.common.base.Function>>) @ org.apache.iceberg.AllDataFilesTable.AllDataFilesTableScan" - justification: "Track manifests per snapshot" - - code: "java.method.returnTypeTypeParametersChanged" - old: "method org.apache.iceberg.io.CloseableIterable\ - \ org.apache.iceberg.BaseAllMetadataTableScan::reachableManifests(org.apache.iceberg.relocated.com.google.common.base.Function>) @ org.apache.iceberg.AllDeleteFilesTable.AllDeleteFilesTableScan" - new: "method org.apache.iceberg.io.CloseableIterable> org.apache.iceberg.BaseAllMetadataTableScan::reachableManifests(org.apache.iceberg.relocated.com.google.common.base.Function>>) @ org.apache.iceberg.AllDeleteFilesTable.AllDeleteFilesTableScan" - justification: "Track manifests per snapshot" - - code: "java.method.returnTypeTypeParametersChanged" - old: "method org.apache.iceberg.io.CloseableIterable\ - \ org.apache.iceberg.BaseAllMetadataTableScan::reachableManifests(org.apache.iceberg.relocated.com.google.common.base.Function>) @ org.apache.iceberg.AllFilesTable.AllFilesTableScan" - new: "method org.apache.iceberg.io.CloseableIterable> org.apache.iceberg.BaseAllMetadataTableScan::reachableManifests(org.apache.iceberg.relocated.com.google.common.base.Function>>) @ org.apache.iceberg.AllFilesTable.AllFilesTableScan" - justification: "Track manifests per snapshot" - - code: "java.method.returnTypeTypeParametersChanged" - old: "method org.apache.iceberg.io.CloseableIterable\ - \ org.apache.iceberg.BaseAllMetadataTableScan::reachableManifests(org.apache.iceberg.relocated.com.google.common.base.Function>) @ org.apache.iceberg.AllManifestsTable.AllManifestsTableScan" - new: "method org.apache.iceberg.io.CloseableIterable> org.apache.iceberg.BaseAllMetadataTableScan::reachableManifests(org.apache.iceberg.relocated.com.google.common.base.Function>>) @ org.apache.iceberg.AllManifestsTable.AllManifestsTableScan" - justification: "Track manifests per snapshot" - - code: "java.method.returnTypeTypeParametersChanged" - old: "method org.apache.iceberg.io.CloseableIterable\ - \ org.apache.iceberg.DataFilesTable.DataFilesTableScan::manifests()" - new: "method org.apache.iceberg.io.CloseableIterable> org.apache.iceberg.DataFilesTable.DataFilesTableScan::manifests()" - justification: "Track manifests per snapshot" - - code: "java.method.returnTypeTypeParametersChanged" - old: "method org.apache.iceberg.io.CloseableIterable\ - \ org.apache.iceberg.DeleteFilesTable.DeleteFilesTableScan::manifests()" - new: "method org.apache.iceberg.io.CloseableIterable> org.apache.iceberg.DeleteFilesTable.DeleteFilesTableScan::manifests()" - justification: "Track manifests per snapshot" - - code: "java.method.returnTypeTypeParametersChanged" - old: "method org.apache.iceberg.io.CloseableIterable\ - \ org.apache.iceberg.FilesTable.FilesTableScan::manifests()" - new: "method org.apache.iceberg.io.CloseableIterable> org.apache.iceberg.FilesTable.FilesTableScan::manifests()" - justification: "Track manifests per snapshot" apache-iceberg-0.14.0: org.apache.iceberg:iceberg-api: - code: "java.class.defaultSerializationChanged" diff --git a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java index 944b0a2becbf..fc0d347e47a8 100644 --- a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java @@ -65,8 +65,13 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext } @Override - protected CloseableIterable> manifests() { - return reachableManifests( + protected CloseableIterable manifests() { + return reachableManifests(snapshot -> snapshot.dataManifests(table().io())); + } + + @Override + protected CloseableIterable> snapshotManifestPairs() { + return reachableSnapshotManifestPairs( snapshot -> snapshot.dataManifests(table().io()).stream() .map(manifestFile -> Pair.of(snapshot, manifestFile)) diff --git a/core/src/main/java/org/apache/iceberg/AllDeleteFilesTable.java b/core/src/main/java/org/apache/iceberg/AllDeleteFilesTable.java index 75c7d10d2b12..cb1733d4339a 100644 --- a/core/src/main/java/org/apache/iceberg/AllDeleteFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/AllDeleteFilesTable.java @@ -65,8 +65,13 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext } @Override - protected CloseableIterable> manifests() { - return reachableManifests( + protected CloseableIterable manifests() { + return reachableManifests(snapshot -> snapshot.deleteManifests(table().io())); + } + + @Override + protected CloseableIterable> snapshotManifestPairs() { + return reachableSnapshotManifestPairs( snapshot -> snapshot.deleteManifests(table().io()).stream() .map(manifestFile -> Pair.of(snapshot, manifestFile)) diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java index fc267e2879b5..a688e5c973d0 100644 --- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java @@ -86,7 +86,7 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext @Override protected CloseableIterable doPlanFiles() { CloseableIterable> snapshotManifestPairs = - reachableManifests( + reachableSnapshotManifestPairs( snapshot -> snapshot.allManifests(table().io()).stream() .map(manifestFile -> Pair.of(snapshot, manifestFile)) diff --git a/core/src/main/java/org/apache/iceberg/AllFilesTable.java b/core/src/main/java/org/apache/iceberg/AllFilesTable.java index 3a5c9cbb9fba..f5f5ab3f686d 100644 --- a/core/src/main/java/org/apache/iceberg/AllFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/AllFilesTable.java @@ -65,8 +65,13 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext } @Override - protected CloseableIterable> manifests() { - return reachableManifests( + protected CloseableIterable manifests() { + return reachableManifests(snapshot -> snapshot.allManifests(table().io())); + } + + @Override + protected CloseableIterable> snapshotManifestPairs() { + return reachableSnapshotManifestPairs( snapshot -> snapshot.allManifests(table().io()).stream() .map(manifestFile -> Pair.of(snapshot, manifestFile)) diff --git a/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java b/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java index 881129bcabf2..76ea47bb2e4f 100644 --- a/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java @@ -72,7 +72,21 @@ public CloseableIterable planFiles() { return doPlanFiles(); } - protected CloseableIterable> reachableManifests( + protected CloseableIterable reachableManifests( + Function> toManifests) { + Iterable snapshots = table().snapshots(); + Iterable> manifestIterables = + Iterables.transform(snapshots, toManifests); + + try (CloseableIterable iterable = + new ParallelIterable<>(manifestIterables, planExecutor())) { + return CloseableIterable.withNoopClose(Sets.newHashSet(iterable)); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close parallel iterable", e); + } + } + + protected CloseableIterable> reachableSnapshotManifestPairs( Function>> toManifests) { Iterable snapshots = table().snapshots(); Iterable>> manifestIterables = diff --git a/core/src/main/java/org/apache/iceberg/BaseFilesTable.java b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java index 8b8e7b6c781b..e38eaf809a4a 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java @@ -100,11 +100,18 @@ protected BaseFilesTableScan( } /** Returns an iterable of manifest files to explore for this files metadata table scan */ - protected abstract CloseableIterable> manifests(); + protected abstract CloseableIterable manifests(); + + /** + * Returns an iterable of (reference_snapshot, manifest file) pairs to explore for this files + * metadata table scan + */ + protected abstract CloseableIterable> snapshotManifestPairs(); @Override protected CloseableIterable doPlanFiles() { - return BaseFilesTable.planFiles(table(), manifests(), tableSchema(), schema(), context()); + return BaseFilesTable.planFiles( + table(), snapshotManifestPairs(), tableSchema(), schema(), context()); } } @@ -120,11 +127,18 @@ protected BaseAllFilesTableScan( } /** Returns an iterable of manifest files to explore for this all files metadata table scan */ - protected abstract CloseableIterable> manifests(); + protected abstract CloseableIterable manifests(); + + /** + * Returns an iterable of (reference_snapshot, manifest file) pairs to explore for this all + * files metadata table scan + */ + protected abstract CloseableIterable> snapshotManifestPairs(); @Override protected CloseableIterable doPlanFiles() { - return BaseFilesTable.planFiles(table(), manifests(), tableSchema(), schema(), context()); + return BaseFilesTable.planFiles( + table(), snapshotManifestPairs(), tableSchema(), schema(), context()); } } diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java index af41d60f09d4..f553e1c77d6c 100644 --- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java @@ -59,7 +59,12 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext } @Override - protected CloseableIterable> manifests() { + protected CloseableIterable manifests() { + return CloseableIterable.withNoopClose(snapshot().dataManifests(table().io())); + } + + @Override + protected CloseableIterable> snapshotManifestPairs() { return CloseableIterable.withNoopClose( snapshot().dataManifests(table().io()).stream() .map(manifestFile -> Pair.of(snapshot(), manifestFile)) diff --git a/core/src/main/java/org/apache/iceberg/DeleteFilesTable.java b/core/src/main/java/org/apache/iceberg/DeleteFilesTable.java index cadc4a2f952b..c1834f2efed2 100644 --- a/core/src/main/java/org/apache/iceberg/DeleteFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/DeleteFilesTable.java @@ -59,7 +59,12 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext } @Override - protected CloseableIterable> manifests() { + protected CloseableIterable manifests() { + return CloseableIterable.withNoopClose(snapshot().deleteManifests(table().io())); + } + + @Override + protected CloseableIterable> snapshotManifestPairs() { return CloseableIterable.withNoopClose( snapshot().deleteManifests(table().io()).stream() .map(manifestFile -> Pair.of(snapshot(), manifestFile)) diff --git a/core/src/main/java/org/apache/iceberg/FilesTable.java b/core/src/main/java/org/apache/iceberg/FilesTable.java index b57e77736dd8..4d2db8df2c31 100644 --- a/core/src/main/java/org/apache/iceberg/FilesTable.java +++ b/core/src/main/java/org/apache/iceberg/FilesTable.java @@ -59,7 +59,12 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext } @Override - protected CloseableIterable> manifests() { + protected CloseableIterable manifests() { + return CloseableIterable.withNoopClose(snapshot().allManifests(table().io())); + } + + @Override + protected CloseableIterable> snapshotManifestPairs() { return CloseableIterable.withNoopClose( snapshot().allManifests(table().io()).stream() .map(manifestFile -> Pair.of(snapshot(), manifestFile))