-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Add reference snapshot ID/timestamps to AllEntriesTable and AllManifestsTable #9335
Core: Add reference snapshot ID/timestamps to AllEntriesTable and AllManifestsTable #9335
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the work, left some preliminary comments
CloseableIterable<ManifestFile> manifests = | ||
reachableManifests(snapshot -> snapshot.allManifests(table().io())); | ||
return BaseEntriesTable.planFiles(table(), manifests, tableSchema(), schema(), context()); | ||
return new ParallelIterable<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we don't close the ParallelIterable like in the original reachableManifests method.
Also looks like we are going to call planFiles many more times than the original.
My first thought is to keep the logic but use a Pair<ManifestFile, Snapshot> where ManifestFile is used now. How about adapting the correct reachableManifest code into a new more generic method like
T traverse(Function<Snapshot, T> func)
and have reachableManifests use that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we don't close the ParallelIterable like in the original reachableManifests method.
Good catch! Let me fix it.
Also looks like we are going to call planFiles many more times than the original.
Let me think about it, thanks.
@@ -174,6 +175,8 @@ public static MetricsModes.MetricsMode metricsMode( | |||
field.type(), file.upperBounds().get(field.fieldId())))); | |||
|
|||
public static final String READABLE_METRICS = "readable_metrics"; | |||
public static final String REF_SNAPSHOT_ID = "reference_snapshot_id"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like this constant is not too suitable in this file, which is for Metrics. I think if we have to have a constant, should be in BaseMetadataTable. Else we can just actually hard code it, it seems simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still feel we should move this one.
Hello @RussellSpitzer, @szehon-ho and I had a discussion about adopting the If I understand #8856 correctly, once we associate Please let us know if that's what you're looking for, thank you. protected CloseableIterable<ManifestFile> reachableManifests(
Function<Snapshot, Iterable<ManifestFile>> toManifests) {
Iterable<Snapshot> snapshots = table().snapshots();
Iterable<Iterable<ManifestFile>> manifestIterables =
Iterables.transform(snapshots, toManifests);
try (CloseableIterable<ManifestFile> iterable =
new ParallelIterable<>(manifestIterables, planExecutor())) {
return CloseableIterable.withNoopClose(Sets.newHashSet(iterable)); // de-dup `ManifestFile`
} catch (IOException e) {
throw new UncheckedIOException("Failed to close parallel iterable", e);
}
} |
Hi @hsiang-c , we took another look with @RussellSpitzer , it seems the manifests are de-duped on the traversal down, but the entries (referred to by manifests) should not be de-duped. wdyt? |
Clarified with @hsiang-c . Will draw a diagram to illustrate the problem. Imagine following graph:
Notice all three snapshots point to the same manifest file. So, given the dedup mechanism we have of traversing manifest files, and assuming first-in-first-out, we will only be left with an entry like:
This seems fine to me, as the word 'as_of_snapshot' make it seem like we want to know when each entry first came into the picture. It does not seem necessary to change the behavior of the table and list every single snapshot that refer to the entry in this table. If an entry changes state (ie, EXISTING to DELETED), we will see another entry in the table, because it is a completely new entry (in a new manifest). cc @RussellSpitzer @hsiang-c for thoughts. |
bf0f977
to
43e4cb9
Compare
43e4cb9
to
e6b7e24
Compare
8e6ce73
to
0b3caef
Compare
@szehon-ho @RussellSpitzer Please take a look for me, thanks. |
List<GenericData.Record> expectedFiles = | ||
ListUtils.union(expectedDataFiles, expectedDeleteFiles); | ||
expectedFiles.sort(Comparator.comparing(r -> ((Integer) r.get("content")))); | ||
assertThat(actualFiles).hasSize(3); | ||
expectedFiles.sort(Comparator.comparing(r -> r.get("file_path").toString())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ordering by file_path
for both actual files and expected files sorts both list deterministically.
List<Row> actualFiles = TestHelpers.selectNonDerived(actualFilesDs).collectAsList(); | ||
Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(); | ||
List<ManifestFile> expectedDataManifests = TestHelpers.dataManifests(table); | ||
List<Record> expectedFiles = | ||
expectedEntries(table, FileContent.DATA, entriesTableSchema, expectedDataManifests, null); | ||
expectedFiles.sort(Comparator.comparing(r -> r.get("file_path").toString())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ordering by file_path
for both actual files and expected files sorts both list deterministically.
@@ -174,6 +175,8 @@ public static MetricsModes.MetricsMode metricsMode( | |||
field.type(), file.upperBounds().get(field.fieldId())))); | |||
|
|||
public static final String READABLE_METRICS = "readable_metrics"; | |||
public static final String REF_SNAPSHOT_ID = "reference_snapshot_id"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still feel we should move this one.
.palantir/revapi.yml
Outdated
@@ -1136,6 +1136,78 @@ acceptedBreaks: | |||
new: "method org.apache.iceberg.BaseMetastoreOperations.CommitStatus org.apache.iceberg.BaseMetastoreTableOperations::checkCommitStatus(java.lang.String,\ | |||
\ org.apache.iceberg.TableMetadata)" | |||
justification: "Removing deprecated code" | |||
- code: "java.method.returnTypeTypeParametersChanged" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately I think we have to keep the old API's for one release. Let's just make a new method then. Hopefully we can re-use logic between these.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szehon-ho Sounds good, let me make a new method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szehon-ho I added a new snapshotManifestPairs()
to the base classes.
0814a8b
to
8c22f5c
Compare
@@ -66,5 +68,14 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext | |||
protected CloseableIterable<ManifestFile> manifests() { | |||
return reachableManifests(snapshot -> snapshot.dataManifests(table().io())); | |||
} | |||
|
|||
@Override | |||
protected CloseableIterable<Pair<Snapshot, ManifestFile>> snapshotManifestPairs() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we deprecate the other one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also just my opinion on name, but to me Pairs is an implementation detail, maybe we can call it manifestsWithSnapshot to explain more logically
@@ -39,6 +39,8 @@ class AllManifestsTableTaskParser { | |||
private static final String MANIFEST_LIST_LOCATION = "manifest-list-Location"; | |||
private static final String RESIDUAL = "residual-filter"; | |||
private static final String REFERENCE_SNAPSHOT_ID = "reference-snapshot-id"; | |||
private static final String REFERENCE_SNAPSHOT_TIMESTAMP_MILLIS = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while its descriptive, its overly longer than other column names, how about combine with @RussellSpitzer original suggestion and 'reference-snapshot-time'
Table table, ManifestFile manifest, Schema projection, Expression filter) { | ||
this(table.schema(), table.io(), table.specs(), manifest, projection, filter); | ||
Table table, | ||
Pair<Snapshot, ManifestFile> snapshotManifestPair, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just add an argument Snapshot, instead of passing pair?
} | ||
|
||
@Override | ||
public <T> T get(int pos, Class<T> javaClass) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wont this not work for Flink, the pos are not always at the end? See https://github.com/apache/iceberg/pull/6222/files
I think we should make a generic struct for this scenario now that we have so many fields. ie, have a position map kind of like: https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/util/StructProjection.java. do you think its possible?
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
Note to reviewers
All
Metadata Tables with Snapshot Information #8856ManifestFile
inBaseAllMetadataTableScan::reachableManifests
, we return aPair<Snapshot, ManifestFile>
from all snapshots.REF_SNAPSHOT_ID
is used byAllManifestsTable
already, so I chose this terminology instead ofAS_OF_SNAPSHOT
. I am open to other names as well.file_path
.I also used
Pair<Snapshot, ManifestFile>
for all_* tables other thanall_entries
andall_manifests
but I'm not sure if it is necessary. Please let me know.Sample output
reference_snapshot_timestamp_millis