-
Notifications
You must be signed in to change notification settings - Fork 134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Catalog: return Iceberg snapshot log based on Nessie commit history #10033
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -119,6 +119,8 @@ | |
import org.projectnessie.catalog.model.schema.types.NessieTimestampTypeSpec; | ||
import org.projectnessie.catalog.model.schema.types.NessieType; | ||
import org.projectnessie.catalog.model.schema.types.NessieTypeSpec; | ||
import org.projectnessie.catalog.model.snapshot.ImmutableImplicitIcebergSnapshot; | ||
import org.projectnessie.catalog.model.snapshot.ImplicitIcebergSnapshot; | ||
import org.projectnessie.catalog.model.snapshot.NessieEntitySnapshot; | ||
import org.projectnessie.catalog.model.snapshot.NessieTableSnapshot; | ||
import org.projectnessie.catalog.model.snapshot.NessieViewRepresentation; | ||
|
@@ -512,7 +514,7 @@ public static NessieTableSnapshot icebergTableSnapshotToNessie( | |
Function<IcebergSnapshot, String> manifestListLocation) { | ||
NessieTableSnapshot.Builder snapshot = NessieTableSnapshot.builder().id(snapshotId); | ||
if (previous != null) { | ||
snapshot.from(previous); | ||
snapshot.from(previous).implicitIcebergSnapshots(Map.of()); | ||
|
||
String previousLocation = previous.icebergLocation(); | ||
if (previousLocation != null && !previousLocation.equals(iceberg.location())) { | ||
|
@@ -623,10 +625,6 @@ public static NessieTableSnapshot icebergTableSnapshotToNessie( | |
.ifPresent( | ||
currentSnapshot -> { | ||
snapshot.icebergSnapshotId(currentSnapshot.snapshotId()); | ||
currentSnapshot | ||
.parentSnapshotId(); // TODO Can we leave this unset, as we do not return previous | ||
// Iceberg | ||
// snapshots?? | ||
Integer schemaId = currentSnapshot.schemaId(); | ||
if (schemaId != null) { | ||
// TODO this overwrites the "current schema ID" with the schema ID of the current | ||
|
@@ -654,10 +652,22 @@ public static NessieTableSnapshot icebergTableSnapshotToNessie( | |
currentSnapshot.manifests(); // TODO | ||
}); | ||
|
||
for (IcebergSnapshotLogEntry logEntry : iceberg.snapshotLog()) { | ||
// TODO ?? | ||
logEntry.snapshotId(); | ||
logEntry.timestampMs(); | ||
for (var icebergSnapshotLogEntry : iceberg.snapshotLog()) { | ||
var snapId = icebergSnapshotLogEntry.snapshotId(); | ||
if (snapId == iceberg.currentSnapshotId()) { | ||
continue; | ||
} | ||
if (previous == null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Those calls come from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes - unfortunate. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why keep |
||
var icebergSnap = iceberg.snapshotById(snapId); | ||
if (icebergSnap.isEmpty()) { | ||
throw new IllegalArgumentException( | ||
"Invalid Iceberg table-metadata: snapshot-log references snapshot with ID " | ||
+ snapId | ||
+ ", which is not present as a snapshot object"); | ||
} | ||
snapshot.putImplicitIcebergSnapshot(snapId, icebergSnap.get().asImplicitIcebergSnapshot()); | ||
} | ||
snapshot.addPreviousIcebergSnapshotId(snapId); | ||
} | ||
|
||
for (IcebergStatisticsFile statisticsFile : iceberg.statistics()) { | ||
|
@@ -943,6 +953,7 @@ public static IcebergViewMetadata nessieViewSnapshotToIceberg( | |
|
||
public static IcebergTableMetadata nessieTableSnapshotToIceberg( | ||
NessieTableSnapshot nessie, | ||
List<ImplicitIcebergSnapshot> history, | ||
Optional<IcebergSpec> requestedSpecVersion, | ||
Consumer<Map<String, String>> tablePropertiesTweak) { | ||
NessieTable entity = nessie.entity(); | ||
|
@@ -1031,6 +1042,19 @@ public static IcebergTableMetadata nessieTableSnapshotToIceberg( | |
? nessie.icebergManifestFileLocations() | ||
: emptyList(); | ||
|
||
var parentSnapshotId = (Long) null; | ||
|
||
for (ImplicitIcebergSnapshot previous : history) { | ||
var snap = IcebergSnapshot.fromImplicitIcebergSnapshot(previous, parentSnapshotId); | ||
metadata.addSnapshot(snap); | ||
parentSnapshotId = snap.snapshotId(); | ||
metadata.addSnapshotLog( | ||
IcebergSnapshotLogEntry.snapshotLogEntry( | ||
previous.timestampMs(), previous.snapshotId())); | ||
// TODO we don't include the metadata location yet - we could potentially do that later | ||
// metadata.addMetadataLog(IcebergHistoryEntry.historyEntry(previousSnap.timestampMs(), )); | ||
} | ||
|
||
IcebergSnapshot.Builder snapshot = | ||
IcebergSnapshot.builder() | ||
.snapshotId(snapshotId) | ||
|
@@ -1039,7 +1063,8 @@ public static IcebergTableMetadata nessieTableSnapshotToIceberg( | |
.manifestList(manifestListLocation) | ||
.summary(nessie.icebergSnapshotSummary()) | ||
.timestampMs(timestampMs) | ||
.sequenceNumber(nessie.icebergSnapshotSequenceNumber()); | ||
.sequenceNumber(nessie.icebergSnapshotSequenceNumber()) | ||
.parentSnapshotId(parentSnapshotId); | ||
metadata.addSnapshots(snapshot.build()); | ||
|
||
metadata.putRef( | ||
|
@@ -1080,9 +1105,6 @@ public static IcebergTableMetadata nessieTableSnapshotToIceberg( | |
partitionStatisticsFile.fileSizeInBytes())); | ||
} | ||
|
||
// metadata.addMetadataLog(); | ||
// metadata.addSnapshotLog(); | ||
|
||
return metadata.build(); | ||
} | ||
|
||
|
@@ -1577,13 +1599,18 @@ public static void addSnapshot(AddSnapshot u, IcebergTableMetadataUpdateState st | |
IcebergSnapshot icebergSnapshot = u.snapshot(); | ||
Integer schemaId = icebergSnapshot.schemaId(); | ||
NessieTableSnapshot snapshot = state.snapshot(); | ||
NessieTableSnapshot.Builder snapshotBuilder = state.builder(); | ||
if (schemaId != null) { | ||
Optional<NessieSchema> schema = snapshot.schemaByIcebergId(schemaId); | ||
schema.ifPresent(s -> state.builder().currentSchemaId(s.id())); | ||
schema.ifPresent(s -> snapshotBuilder.currentSchemaId(s.id())); | ||
} | ||
|
||
state | ||
.builder() | ||
var currentIcebergSnapshotId = snapshot.icebergSnapshotId(); | ||
if (currentIcebergSnapshotId != null && currentIcebergSnapshotId != -1L) { | ||
snapshotBuilder.addPreviousIcebergSnapshotId(currentIcebergSnapshotId); | ||
} | ||
|
||
snapshotBuilder | ||
.icebergSnapshotId(icebergSnapshot.snapshotId()) | ||
.icebergSnapshotSequenceNumber(icebergSnapshot.sequenceNumber()) | ||
.icebergLastSequenceNumber( | ||
|
@@ -1599,6 +1626,23 @@ public static void addSnapshot(AddSnapshot u, IcebergTableMetadataUpdateState st | |
.icebergManifestListLocation(icebergSnapshot.manifestList()) | ||
.icebergManifestFileLocations(icebergSnapshot.manifests()); | ||
|
||
// If another Iceberg snapshot was add in this same update-transaction, memoize the previous | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
// snapshot so we can return the full snapshot history. | ||
var previouslyAddedSnapshot = state.previouslyAddedSnapshot(); | ||
if (previouslyAddedSnapshot != null) { | ||
snapshotBuilder.putImplicitIcebergSnapshot( | ||
previouslyAddedSnapshot.snapshotId(), | ||
ImmutableImplicitIcebergSnapshot.builder() | ||
.snapshotId(previouslyAddedSnapshot.snapshotId()) | ||
.schemaId(previouslyAddedSnapshot.schemaId()) | ||
.manifests(previouslyAddedSnapshot.manifests()) | ||
.manifestList(previouslyAddedSnapshot.manifestList()) | ||
.sequenceNumber(previouslyAddedSnapshot.sequenceNumber()) | ||
.summary(previouslyAddedSnapshot.summary()) | ||
.timestampMs(previouslyAddedSnapshot.timestampMs()) | ||
.build()); | ||
} | ||
|
||
state.snapshotAdded(icebergSnapshot); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.