diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a0608b56c6..365776168ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,12 @@ as necessary. Empty sections will not end in the release notes. ### Highlights +- Nessie now returns the snapshot history in the `snapshots` and `snapshot-log` attributes of an Iceberg + table-metadata retrieved via Iceberg REST for table changes that have been committed via Iceberg REST + to Nessie 0.101.0 or newer. Commits made using older Nessie versions will not return older snapshots. +- Generally, not only for Nessie, It is recommended to keep the number of snapshots maintained in an + Iceberg table-metadata as low as possible. Use the maintenance operations provided by Iceberg. + ### Upgrade notes ### Breaking changes diff --git a/catalog/format/iceberg-fixturegen/src/main/java/org/projectnessie/catalog/formats/iceberg/fixtures/IcebergFixtures.java b/catalog/format/iceberg-fixturegen/src/main/java/org/projectnessie/catalog/formats/iceberg/fixtures/IcebergFixtures.java index 7b88da773c9..0fc06e96138 100644 --- a/catalog/format/iceberg-fixturegen/src/main/java/org/projectnessie/catalog/formats/iceberg/fixtures/IcebergFixtures.java +++ b/catalog/format/iceberg-fixturegen/src/main/java/org/projectnessie/catalog/formats/iceberg/fixtures/IcebergFixtures.java @@ -177,12 +177,62 @@ public static IcebergTableMetadata.Builder tableMetadataSimple() { .putSummary("operation", "testing") .sequenceNumber(123L) .timestampMs(12345678L) + .manifestList("s3://this-does-not-exist/anywhere/") .build()) .putRef("main", IcebergSnapshotRef.builder().type("branch").snapshotId(11).build()) .addSnapshotLog( IcebergSnapshotLogEntry.builder().snapshotId(11).timestampMs(12345678L).build()); } + public static IcebergTableMetadata.Builder tableMetadataThreeSnapshots() { + IcebergSchema schemaAllTypes = icebergSchemaAllTypes(); + + return IcebergTableMetadata.builder() + .tableUuid(UUID.randomUUID().toString()) + .lastUpdatedMs(111111111L) + .location("table-location") + .currentSnapshotId(13) + .lastColumnId(schemaAllTypes.fields().get(schemaAllTypes.fields().size() - 1).id()) + .lastPartitionId(INITIAL_PARTITION_ID) + .lastSequenceNumber(INITIAL_SEQUENCE_NUMBER) + .currentSchemaId(schemaAllTypes.schemaId()) + .defaultSortOrderId(INITIAL_SORT_ORDER_ID) + .defaultSpecId(INITIAL_SPEC_ID) + .putProperty("prop", "value") + .addSchemas(schemaAllTypes) + .addSnapshots( + IcebergSnapshot.builder() + .snapshotId(11) + .schemaId(schemaAllTypes.schemaId()) + .putSummary("operation", "testing1") + .sequenceNumber(123L) + .timestampMs(12345676L) + .build()) + .addSnapshots( + IcebergSnapshot.builder() + .snapshotId(12) + .schemaId(schemaAllTypes.schemaId()) + .putSummary("operation", "testing2") + .sequenceNumber(124L) + .timestampMs(12345677L) + .build()) + .addSnapshots( + IcebergSnapshot.builder() + .snapshotId(13) + .schemaId(schemaAllTypes.schemaId()) + .putSummary("operation", "testing3") + .sequenceNumber(125L) + .timestampMs(12345678L) + .build()) + .putRef("main", IcebergSnapshotRef.builder().type("branch").snapshotId(13).build()) + .addSnapshotLog( + IcebergSnapshotLogEntry.builder().snapshotId(11).timestampMs(12345676L).build()) + .addSnapshotLog( + IcebergSnapshotLogEntry.builder().snapshotId(12).timestampMs(12345677L).build()) + .addSnapshotLog( + IcebergSnapshotLogEntry.builder().snapshotId(13).timestampMs(12345678L).build()); + } + public static IcebergViewMetadata.Builder viewMetadataSimple() { IcebergSchema schemaAllTypes = icebergSchemaAllTypes(); diff --git a/catalog/format/iceberg-fixturegen/src/main/java/org/projectnessie/catalog/formats/iceberg/fixtures/IcebergGenerateFixtures.java b/catalog/format/iceberg-fixturegen/src/main/java/org/projectnessie/catalog/formats/iceberg/fixtures/IcebergGenerateFixtures.java index 1fd218b46c5..9eee6660470 100644 --- a/catalog/format/iceberg-fixturegen/src/main/java/org/projectnessie/catalog/formats/iceberg/fixtures/IcebergGenerateFixtures.java +++ b/catalog/format/iceberg-fixturegen/src/main/java/org/projectnessie/catalog/formats/iceberg/fixtures/IcebergGenerateFixtures.java @@ -168,7 +168,7 @@ public void close() throws IOException { }; UUID commitId = randomUUID(); long snapshotId = 1; - long sequenceNumber = 1; + long sequenceNumber = 1000; long timestamp = 1715175169320L; IcebergManifestFileGenerator manifestFileGenerator = IcebergManifestFileGenerator.builder() @@ -210,8 +210,8 @@ public void close() throws IOException { .currentSnapshotId(snapshotId) .defaultSpecId(schemaGenerator.getIcebergPartitionSpec().specId()) .defaultSortOrderId(IcebergSortOrder.UNSORTED_ORDER.orderId()) - .lastSequenceNumber(1L) - .snapshots(singletonList(snapshotWithManifestList)) + .lastSequenceNumber(1000L) + .addSnapshot(snapshotWithManifestList) .build(); metadataConsumer.accept(icebergMetadataWithManifestList); return writer.write( diff --git a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/meta/IcebergSnapshot.java b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/meta/IcebergSnapshot.java index f5dadf27509..aeb9272757f 100644 --- a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/meta/IcebergSnapshot.java +++ b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/meta/IcebergSnapshot.java @@ -17,6 +17,7 @@ import static com.google.common.base.Preconditions.checkState; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonView; @@ -30,6 +31,8 @@ import javax.annotation.Nullable; import org.immutables.value.Value; import org.projectnessie.catalog.formats.iceberg.IcebergSpec; +import org.projectnessie.catalog.model.snapshot.ImmutableImplicitIcebergSnapshot; +import org.projectnessie.catalog.model.snapshot.ImplicitIcebergSnapshot; import org.projectnessie.nessie.immutables.NessieImmutable; @NessieImmutable @@ -62,6 +65,31 @@ static IcebergSnapshot snapshot( schemaId); } + static IcebergSnapshot fromImplicitIcebergSnapshot( + ImplicitIcebergSnapshot implicit, Long parentSnapshotId) { + return ImmutableIcebergSnapshot.of( + implicit.sequenceNumber(), + implicit.snapshotId(), + parentSnapshotId, + implicit.timestampMs(), + implicit.summary(), + implicit.manifests(), + implicit.manifestList(), + implicit.schemaId()); + } + + @JsonIgnore + default ImplicitIcebergSnapshot asImplicitIcebergSnapshot() { + return ImmutableImplicitIcebergSnapshot.of( + snapshotId(), + sequenceNumber(), + timestampMs(), + summary(), + manifests(), + manifestList(), + schemaId()); + } + @Nullable @jakarta.annotation.Nullable @JsonInclude(JsonInclude.Include.NON_NULL) diff --git a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/CatalogOps.java b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/CatalogOps.java index 437b616fc94..2252b7f05d0 100644 --- a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/CatalogOps.java +++ b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/CatalogOps.java @@ -41,6 +41,7 @@ public enum CatalogOps { META_SET_SNAPSHOT_REF, META_REMOVE_SNAPSHOT_REF, META_UPGRADE_FORMAT_VERSION, + META_REMOVE_SNAPSHOTS, // Catalog operations CATALOG_CREATE_ENTITY, diff --git a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/IcebergTableMetadataUpdateState.java b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/IcebergTableMetadataUpdateState.java index ffa96880fc1..d55fce551a5 100644 --- a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/IcebergTableMetadataUpdateState.java +++ b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/IcebergTableMetadataUpdateState.java @@ -19,7 +19,6 @@ import static java.util.Collections.emptyMap; import java.time.Instant; -import java.util.ArrayList; import java.util.EnumSet; import java.util.HashSet; import java.util.List; @@ -56,7 +55,7 @@ public class IcebergTableMetadataUpdateState { private int lastAddedSchemaId = -1; private int lastAddedSpecId = -1; private int lastAddedOrderId = -1; - private final List addedSnapshots = new ArrayList<>(); + private IcebergSnapshot previouslyAddedSnapshot = null; private final Set addedSchemaIds = new HashSet<>(); private final Set addedSpecIds = new HashSet<>(); private final Set addedOrderIds = new HashSet<>(); @@ -86,12 +85,12 @@ public NessieTableSnapshot snapshot() { return snapshot; } - public List addedSnapshots() { - return addedSnapshots; + public IcebergSnapshot previouslyAddedSnapshot() { + return previouslyAddedSnapshot; } public void snapshotAdded(IcebergSnapshot snapshot) { - addedSnapshots.add(snapshot); + previouslyAddedSnapshot = snapshot; } public int lastAddedSchemaId() { diff --git a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/NessieModelIceberg.java b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/NessieModelIceberg.java index 88fe5ad6403..0494732c5ca 100644 --- a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/NessieModelIceberg.java +++ b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/nessie/NessieModelIceberg.java @@ -119,6 +119,8 @@ import org.projectnessie.catalog.model.schema.types.NessieTimestampTypeSpec; import org.projectnessie.catalog.model.schema.types.NessieType; import org.projectnessie.catalog.model.schema.types.NessieTypeSpec; +import org.projectnessie.catalog.model.snapshot.ImmutableImplicitIcebergSnapshot; +import org.projectnessie.catalog.model.snapshot.ImplicitIcebergSnapshot; import org.projectnessie.catalog.model.snapshot.NessieEntitySnapshot; import org.projectnessie.catalog.model.snapshot.NessieTableSnapshot; import org.projectnessie.catalog.model.snapshot.NessieViewRepresentation; @@ -512,7 +514,7 @@ public static NessieTableSnapshot icebergTableSnapshotToNessie( Function manifestListLocation) { NessieTableSnapshot.Builder snapshot = NessieTableSnapshot.builder().id(snapshotId); if (previous != null) { - snapshot.from(previous); + snapshot.from(previous).implicitIcebergSnapshots(Map.of()); String previousLocation = previous.icebergLocation(); if (previousLocation != null && !previousLocation.equals(iceberg.location())) { @@ -623,10 +625,6 @@ public static NessieTableSnapshot icebergTableSnapshotToNessie( .ifPresent( currentSnapshot -> { snapshot.icebergSnapshotId(currentSnapshot.snapshotId()); - currentSnapshot - .parentSnapshotId(); // TODO Can we leave this unset, as we do not return previous - // Iceberg - // snapshots?? Integer schemaId = currentSnapshot.schemaId(); if (schemaId != null) { // TODO this overwrites the "current schema ID" with the schema ID of the current @@ -654,10 +652,22 @@ public static NessieTableSnapshot icebergTableSnapshotToNessie( currentSnapshot.manifests(); // TODO }); - for (IcebergSnapshotLogEntry logEntry : iceberg.snapshotLog()) { - // TODO ?? - logEntry.snapshotId(); - logEntry.timestampMs(); + for (var icebergSnapshotLogEntry : iceberg.snapshotLog()) { + var snapId = icebergSnapshotLogEntry.snapshotId(); + if (snapId == iceberg.currentSnapshotId()) { + continue; + } + if (previous == null) { + var icebergSnap = iceberg.snapshotById(snapId); + if (icebergSnap.isEmpty()) { + throw new IllegalArgumentException( + "Invalid Iceberg table-metadata: snapshot-log references snapshot with ID " + + snapId + + ", which is not present as a snapshot object"); + } + snapshot.putImplicitIcebergSnapshot(snapId, icebergSnap.get().asImplicitIcebergSnapshot()); + } + snapshot.addPreviousIcebergSnapshotId(snapId); } for (IcebergStatisticsFile statisticsFile : iceberg.statistics()) { @@ -943,6 +953,7 @@ public static IcebergViewMetadata nessieViewSnapshotToIceberg( public static IcebergTableMetadata nessieTableSnapshotToIceberg( NessieTableSnapshot nessie, + List history, Optional requestedSpecVersion, Consumer> tablePropertiesTweak) { NessieTable entity = nessie.entity(); @@ -1031,6 +1042,19 @@ public static IcebergTableMetadata nessieTableSnapshotToIceberg( ? nessie.icebergManifestFileLocations() : emptyList(); + var parentSnapshotId = (Long) null; + + for (ImplicitIcebergSnapshot previous : history) { + var snap = IcebergSnapshot.fromImplicitIcebergSnapshot(previous, parentSnapshotId); + metadata.addSnapshot(snap); + parentSnapshotId = snap.snapshotId(); + metadata.addSnapshotLog( + IcebergSnapshotLogEntry.snapshotLogEntry( + previous.timestampMs(), previous.snapshotId())); + // TODO we don't include the metadata location yet - we could potentially do that later + // metadata.addMetadataLog(IcebergHistoryEntry.historyEntry(previousSnap.timestampMs(), )); + } + IcebergSnapshot.Builder snapshot = IcebergSnapshot.builder() .snapshotId(snapshotId) @@ -1039,7 +1063,8 @@ public static IcebergTableMetadata nessieTableSnapshotToIceberg( .manifestList(manifestListLocation) .summary(nessie.icebergSnapshotSummary()) .timestampMs(timestampMs) - .sequenceNumber(nessie.icebergSnapshotSequenceNumber()); + .sequenceNumber(nessie.icebergSnapshotSequenceNumber()) + .parentSnapshotId(parentSnapshotId); metadata.addSnapshots(snapshot.build()); metadata.putRef( @@ -1080,9 +1105,6 @@ public static IcebergTableMetadata nessieTableSnapshotToIceberg( partitionStatisticsFile.fileSizeInBytes())); } - // metadata.addMetadataLog(); - // metadata.addSnapshotLog(); - return metadata.build(); } @@ -1577,13 +1599,18 @@ public static void addSnapshot(AddSnapshot u, IcebergTableMetadataUpdateState st IcebergSnapshot icebergSnapshot = u.snapshot(); Integer schemaId = icebergSnapshot.schemaId(); NessieTableSnapshot snapshot = state.snapshot(); + NessieTableSnapshot.Builder snapshotBuilder = state.builder(); if (schemaId != null) { Optional schema = snapshot.schemaByIcebergId(schemaId); - schema.ifPresent(s -> state.builder().currentSchemaId(s.id())); + schema.ifPresent(s -> snapshotBuilder.currentSchemaId(s.id())); } - state - .builder() + var currentIcebergSnapshotId = snapshot.icebergSnapshotId(); + if (currentIcebergSnapshotId != null && currentIcebergSnapshotId != -1L) { + snapshotBuilder.addPreviousIcebergSnapshotId(currentIcebergSnapshotId); + } + + snapshotBuilder .icebergSnapshotId(icebergSnapshot.snapshotId()) .icebergSnapshotSequenceNumber(icebergSnapshot.sequenceNumber()) .icebergLastSequenceNumber( @@ -1599,6 +1626,23 @@ public static void addSnapshot(AddSnapshot u, IcebergTableMetadataUpdateState st .icebergManifestListLocation(icebergSnapshot.manifestList()) .icebergManifestFileLocations(icebergSnapshot.manifests()); + // If another Iceberg snapshot was add in this same update-transaction, memoize the previous + // snapshot so we can return the full snapshot history. + var previouslyAddedSnapshot = state.previouslyAddedSnapshot(); + if (previouslyAddedSnapshot != null) { + snapshotBuilder.putImplicitIcebergSnapshot( + previouslyAddedSnapshot.snapshotId(), + ImmutableImplicitIcebergSnapshot.builder() + .snapshotId(previouslyAddedSnapshot.snapshotId()) + .schemaId(previouslyAddedSnapshot.schemaId()) + .manifests(previouslyAddedSnapshot.manifests()) + .manifestList(previouslyAddedSnapshot.manifestList()) + .sequenceNumber(previouslyAddedSnapshot.sequenceNumber()) + .summary(previouslyAddedSnapshot.summary()) + .timestampMs(previouslyAddedSnapshot.timestampMs()) + .build()); + } + state.snapshotAdded(icebergSnapshot); } diff --git a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergMetadataUpdate.java b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergMetadataUpdate.java index 63a95fd1ab0..09919a6e4ea 100644 --- a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergMetadataUpdate.java +++ b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergMetadataUpdate.java @@ -15,6 +15,7 @@ */ package org.projectnessie.catalog.formats.iceberg.rest; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static java.util.Collections.emptyList; import static java.util.Collections.singleton; @@ -37,6 +38,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.immutables.value.Value; import org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionSpec; @@ -156,8 +158,14 @@ interface RemoveSnapshots extends IcebergMetadataUpdate { @Override default void applyToTable(IcebergTableMetadataUpdateState state) { - throw new UnsupportedOperationException( - "Nessie Catalog does not allow external snapshot management"); + state.addCatalogOp(CatalogOps.META_REMOVE_SNAPSHOTS); + var ids = new HashSet<>(snapshotIds()); + state + .builder() + .previousIcebergSnapshotIds( + state.snapshot().previousIcebergSnapshotIds().stream() + .filter(id -> !ids.contains(id)) + .collect(Collectors.toList())); } } @@ -476,6 +484,19 @@ default void applyToTable(IcebergTableMetadataUpdateState state) { state.addCatalogOp(CatalogOps.META_ADD_SNAPSHOT); Map summary = snapshot().summary(); + Long currentSnapshotId = state.snapshot().icebergSnapshotId(); + Long parentSnapshotId = snapshot().parentSnapshotId(); + checkArgument( + parentSnapshotId == null || Objects.equals(currentSnapshotId, parentSnapshotId), + "Snapshot to be added must use the parent snapshot ID %s or null, but provided parent snapshot ID is %s", + currentSnapshotId, + parentSnapshotId); + checkArgument( + (currentSnapshotId == null || snapshot().snapshotId() != currentSnapshotId) + && !state.snapshot().previousIcebergSnapshotIds().contains(snapshot().snapshotId()), + "Provided snapshot ID %s is already used", + snapshot().snapshotId()); + String v = summary.get("added-data-files"); if (v != null && Long.parseLong(v) > 0) { state.addCatalogOp(CatalogOps.SNAP_ADD_DATA_FILES); diff --git a/catalog/format/iceberg/src/test/java/org/projectnessie/catalog/formats/iceberg/nessie/TestNessieModelIceberg.java b/catalog/format/iceberg/src/test/java/org/projectnessie/catalog/formats/iceberg/nessie/TestNessieModelIceberg.java index 3bf77ed02b2..76b168c5e15 100644 --- a/catalog/format/iceberg/src/test/java/org/projectnessie/catalog/formats/iceberg/nessie/TestNessieModelIceberg.java +++ b/catalog/format/iceberg/src/test/java/org/projectnessie/catalog/formats/iceberg/nessie/TestNessieModelIceberg.java @@ -23,6 +23,7 @@ import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataBare; import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataBareWithSchema; import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataSimple; +import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataThreeSnapshots; import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataWithStatistics; import static org.projectnessie.catalog.formats.iceberg.meta.IcebergNestedField.nestedField; import static org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionField.partitionField; @@ -67,6 +68,7 @@ import org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionSpec; import org.projectnessie.catalog.formats.iceberg.meta.IcebergSchema; import org.projectnessie.catalog.formats.iceberg.meta.IcebergSnapshot; +import org.projectnessie.catalog.formats.iceberg.meta.IcebergSnapshotLogEntry; import org.projectnessie.catalog.formats.iceberg.meta.IcebergSortField; import org.projectnessie.catalog.formats.iceberg.meta.IcebergSortOrder; import org.projectnessie.catalog.formats.iceberg.meta.IcebergTableMetadata; @@ -243,11 +245,21 @@ public void icebergTableMetadata(IcebergTableMetadata icebergTableMetadata) thro NessieTableSnapshot nessie = NessieModelIceberg.icebergTableSnapshotToNessie( snapshotId, null, table, icebergTableMetadata, IcebergSnapshot::manifestList); + + soft.assertThat(nessie.previousIcebergSnapshotIds()) + .hasSize(Math.max(icebergTableMetadata.snapshotLog().size() - 1, 0)) + .containsExactlyElementsOf( + icebergTableMetadata.snapshotLog().stream() + .map(IcebergSnapshotLogEntry::snapshotId) + .filter(id -> id != icebergTableMetadata.currentSnapshotId()) + .collect(Collectors.toList())); + soft.assertThat(icebergJsonSerializeDeserialize(nessie, NessieTableSnapshot.class)) .isEqualTo(nessie); IcebergTableMetadata iceberg = - NessieModelIceberg.nessieTableSnapshotToIceberg(nessie, Optional.empty(), properties -> {}); + NessieModelIceberg.nessieTableSnapshotToIceberg( + nessie, List.of(), Optional.empty(), properties -> {}); IcebergTableMetadata icebergWithCatalogProps = IcebergTableMetadata.builder() .from(icebergTableMetadata) @@ -255,12 +267,28 @@ public void icebergTableMetadata(IcebergTableMetadata icebergTableMetadata) thro iceberg.properties().entrySet().stream() .filter(e -> e.getKey().startsWith("nessie.")) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) + .snapshots( + iceberg.snapshots().stream() + .filter(s -> s.snapshotId() == iceberg.currentSnapshotId()) + .collect(Collectors.toList())) + .snapshotLog( + iceberg.snapshotLog().stream() + .filter(s -> s.snapshotId() == iceberg.currentSnapshotId()) + .collect(Collectors.toList())) .schema( icebergTableMetadata.formatVersion() > 1 ? null : iceberg.schemas().isEmpty() ? null : iceberg.schemas().get(0)) .build(); - soft.assertThat(iceberg).isEqualTo(icebergWithCatalogProps); + IcebergTableMetadata icebergCurrentSnapshotOnly = + IcebergTableMetadata.builder() + .from(iceberg) + .snapshots( + iceberg.snapshots().stream() + .filter(s -> s.snapshotId() == iceberg.currentSnapshotId()) + .collect(Collectors.toList())) + .build(); + soft.assertThat(icebergCurrentSnapshotOnly).isEqualTo(icebergWithCatalogProps); NessieTableSnapshot nessieAgain = NessieModelIceberg.icebergTableSnapshotToNessie( @@ -278,7 +306,9 @@ static Stream icebergTableMetadata() { // snapshot tableMetadataSimple(), // statistics - tableMetadataWithStatistics()) + tableMetadataWithStatistics(), + // 3 snapshots + tableMetadataThreeSnapshots()) .flatMap( builder -> Stream.of( @@ -513,7 +543,8 @@ public void icebergNested(IcebergSchema schema, IcebergSchema expected, int expe .isEqualTo(expectedLastColumnId); IcebergTableMetadata icebergMetadata = - NessieModelIceberg.nessieTableSnapshotToIceberg(snapshot, Optional.empty(), m -> {}); + NessieModelIceberg.nessieTableSnapshotToIceberg( + snapshot, List.of(), Optional.empty(), m -> {}); soft.assertThat(icebergMetadata) .extracting(IcebergTableMetadata::lastColumnId) .isEqualTo(expectedLastColumnId); diff --git a/catalog/model/src/main/java/org/projectnessie/catalog/model/snapshot/ImplicitIcebergSnapshot.java b/catalog/model/src/main/java/org/projectnessie/catalog/model/snapshot/ImplicitIcebergSnapshot.java new file mode 100644 index 00000000000..7e1f3e4b005 --- /dev/null +++ b/catalog/model/src/main/java/org/projectnessie/catalog/model/snapshot/ImplicitIcebergSnapshot.java @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.catalog.model.snapshot; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.projectnessie.catalog.model.schema.NessieSchema; +import org.projectnessie.nessie.immutables.NessieImmutable; + +@NessieImmutable +@JsonSerialize(as = ImmutableImplicitIcebergSnapshot.class) +@JsonDeserialize(as = ImmutableImplicitIcebergSnapshot.class) +public interface ImplicitIcebergSnapshot { + static ImplicitIcebergSnapshot implicitIcebergSnapshot(NessieTableSnapshot tableSnapshot) { + return ImmutableImplicitIcebergSnapshot.of( + tableSnapshot.icebergSnapshotId(), + tableSnapshot.icebergSnapshotSequenceNumber(), + tableSnapshot.snapshotCreatedTimestamp().toEpochMilli(), + tableSnapshot.icebergSnapshotSummary(), + tableSnapshot.icebergManifestFileLocations(), + tableSnapshot.icebergManifestListLocation(), + tableSnapshot + .currentSchemaObject() + .map(NessieSchema::icebergId) + .orElse(NessieSchema.NO_SCHEMA_ID)); + } + + long snapshotId(); + + @Nullable + @jakarta.annotation.Nullable + @JsonInclude(JsonInclude.Include.NON_NULL) + Long sequenceNumber(); + + long timestampMs(); + + Map summary(); + + @JsonInclude(JsonInclude.Include.NON_EMPTY) + List manifests(); + + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + @jakarta.annotation.Nullable + String manifestList(); + + @Nullable + @jakarta.annotation.Nullable + @JsonInclude(JsonInclude.Include.NON_NULL) + Integer schemaId(); +} diff --git a/catalog/model/src/main/java/org/projectnessie/catalog/model/snapshot/NessieTableSnapshot.java b/catalog/model/src/main/java/org/projectnessie/catalog/model/snapshot/NessieTableSnapshot.java index 1c3787b3443..6c23e547fab 100644 --- a/catalog/model/src/main/java/org/projectnessie/catalog/model/snapshot/NessieTableSnapshot.java +++ b/catalog/model/src/main/java/org/projectnessie/catalog/model/snapshot/NessieTableSnapshot.java @@ -120,6 +120,19 @@ default Optional sortDefinitionByIcebergId(int orderId) { @jakarta.annotation.Nullable Long icebergSnapshotId(); + /** + * List of previous snapshot IDs, in the same order as Iceberg's {@code + * TableMetadata.snapshotLog}, which is oldest first, but without the current snapshot ID. + */ + @JsonInclude(JsonInclude.Include.NON_EMPTY) + List previousIcebergSnapshotIds(); + + /** + * Contains Iceberg snapshots for which no explicit Nessie commit exists. We need to memoize those + * snapshots here in case a single Iceberg transaction add multiple snapshots. + */ + Map implicitIcebergSnapshots(); + @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable @jakarta.annotation.Nullable @@ -269,6 +282,30 @@ interface Builder extends NessieEntitySnapshot.Builder { @CanIgnoreReturnValue Builder icebergSnapshotId(@Nullable Long icebergSnapshotId); + @CanIgnoreReturnValue + Builder addPreviousIcebergSnapshotId(long element); + + @CanIgnoreReturnValue + Builder addPreviousIcebergSnapshotIds(long... elements); + + @CanIgnoreReturnValue + Builder previousIcebergSnapshotIds(Iterable elements); + + @CanIgnoreReturnValue + Builder addAllPreviousIcebergSnapshotIds(Iterable elements); + + @CanIgnoreReturnValue + Builder putImplicitIcebergSnapshot(long key, ImplicitIcebergSnapshot value); + + @CanIgnoreReturnValue + Builder putImplicitIcebergSnapshot(Map.Entry entry); + + @CanIgnoreReturnValue + Builder implicitIcebergSnapshots(Map entries); + + @CanIgnoreReturnValue + Builder putAllImplicitIcebergSnapshots(Map entries); + @CanIgnoreReturnValue Builder icebergLastSequenceNumber(@Nullable Long icebergLastSequenceNumber); diff --git a/catalog/service/common/src/main/java/org/projectnessie/catalog/service/api/CatalogService.java b/catalog/service/common/src/main/java/org/projectnessie/catalog/service/api/CatalogService.java index eef0bd18637..2f1d2eca783 100644 --- a/catalog/service/common/src/main/java/org/projectnessie/catalog/service/api/CatalogService.java +++ b/catalog/service/common/src/main/java/org/projectnessie/catalog/service/api/CatalogService.java @@ -16,7 +16,6 @@ package org.projectnessie.catalog.service.api; import jakarta.annotation.Nullable; -import java.net.URI; import java.util.List; import java.util.Map; import java.util.Optional; @@ -26,7 +25,6 @@ import java.util.function.Supplier; import java.util.stream.Stream; import org.projectnessie.api.v2.params.ParsedReference; -import org.projectnessie.catalog.model.snapshot.NessieEntitySnapshot; import org.projectnessie.catalog.service.config.WarehouseConfig; import org.projectnessie.error.BaseNessieClientServerException; import org.projectnessie.error.NessieNotFoundException; @@ -78,10 +76,7 @@ CompletionStage> commit( ApiContext apiContext) throws BaseNessieClientServerException; - interface CatalogUriResolver { - URI icebergSnapshot( - Reference effectiveReference, ContentKey key, NessieEntitySnapshot snapshot); - } + boolean checkIcebergSnapshotPresent(String metadataLocation, long versionId); Optional validateStorageLocation(String location); diff --git a/catalog/service/common/src/main/java/org/projectnessie/catalog/service/api/SnapshotFormat.java b/catalog/service/common/src/main/java/org/projectnessie/catalog/service/api/SnapshotFormat.java index 323e4262c74..50185a20ac0 100644 --- a/catalog/service/common/src/main/java/org/projectnessie/catalog/service/api/SnapshotFormat.java +++ b/catalog/service/common/src/main/java/org/projectnessie/catalog/service/api/SnapshotFormat.java @@ -25,7 +25,18 @@ public enum SnapshotFormat { * The Nessie Catalog main native format includes the entity snapshot information with schemas, * partition definitions and sort definitions. */ - NESSIE_SNAPSHOT, + NESSIE_SNAPSHOT(false), /** Iceberg table metadata. */ - ICEBERG_TABLE_METADATA, + ICEBERG_TABLE_METADATA(true), + ; + + private final boolean includeOldSnapshots; + + SnapshotFormat(boolean includeOldSnapshots) { + this.includeOldSnapshots = includeOldSnapshots; + } + + public boolean includeOldSnapshots() { + return includeOldSnapshots; + } } diff --git a/catalog/service/common/src/main/java/org/projectnessie/catalog/service/objtypes/EntitySnapshotObj.java b/catalog/service/common/src/main/java/org/projectnessie/catalog/service/objtypes/EntitySnapshotObj.java index 0f318c10f98..60f8cb3290b 100644 --- a/catalog/service/common/src/main/java/org/projectnessie/catalog/service/objtypes/EntitySnapshotObj.java +++ b/catalog/service/common/src/main/java/org/projectnessie/catalog/service/objtypes/EntitySnapshotObj.java @@ -66,6 +66,10 @@ default ObjType type() { TaskObj.taskDefaultCacheExpire(), c -> ObjType.NOT_CACHED); + static ObjId snapshotObjIdForContent(String metadataLocation, long versionId) { + return snapshotIdForContent(metadataLocation, versionId); + } + static ObjId snapshotObjIdForContent(Content content) { ObjId id = snapshotIdForContent(content); if (id != null) { diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java index 2acd615048e..04397beed46 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java @@ -35,6 +35,7 @@ import static org.projectnessie.catalog.formats.iceberg.nessie.NessieModelIceberg.typeToEntityName; import static org.projectnessie.catalog.formats.iceberg.rest.IcebergMetadataUpdate.SetLocation.setTrustedLocation; import static org.projectnessie.catalog.formats.iceberg.rest.IcebergMetadataUpdate.SetProperties.setProperties; +import static org.projectnessie.catalog.model.snapshot.ImplicitIcebergSnapshot.implicitIcebergSnapshot; import static org.projectnessie.catalog.service.api.NessieSnapshotResponse.nessieSnapshotResponse; import static org.projectnessie.catalog.service.impl.Util.objIdToNessieId; import static org.projectnessie.catalog.service.objtypes.EntitySnapshotObj.snapshotObjIdForContent; @@ -43,6 +44,7 @@ import static org.projectnessie.model.Content.Type.ICEBERG_TABLE; import static org.projectnessie.model.Content.Type.NAMESPACE; import static org.projectnessie.versioned.RequestMeta.API_READ; +import static org.projectnessie.versioned.RequestMeta.API_WRITE; import com.google.common.annotations.VisibleForTesting; import jakarta.annotation.Nullable; @@ -52,6 +54,8 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; @@ -61,6 +65,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -82,6 +88,7 @@ import org.projectnessie.catalog.formats.iceberg.rest.IcebergUpdateRequirement.AssertCreate; import org.projectnessie.catalog.model.id.NessieId; import org.projectnessie.catalog.model.ops.CatalogOperation; +import org.projectnessie.catalog.model.snapshot.ImplicitIcebergSnapshot; import org.projectnessie.catalog.model.snapshot.NessieEntitySnapshot; import org.projectnessie.catalog.model.snapshot.NessieTableSnapshot; import org.projectnessie.catalog.model.snapshot.NessieViewSnapshot; @@ -103,8 +110,8 @@ import org.projectnessie.model.Conflict; import org.projectnessie.model.Content; import org.projectnessie.model.ContentKey; -import org.projectnessie.model.ContentResponse; import org.projectnessie.model.GetMultipleContentsResponse; +import org.projectnessie.model.IcebergContent; import org.projectnessie.model.Namespace; import org.projectnessie.model.Reference; import org.projectnessie.nessie.tasks.api.TasksService; @@ -117,9 +124,11 @@ import org.projectnessie.services.spi.ContentService; import org.projectnessie.services.spi.TreeService; import org.projectnessie.storage.uri.StorageUri; +import org.projectnessie.versioned.ContentHistoryEntry; import org.projectnessie.versioned.RequestMeta; import org.projectnessie.versioned.RequestMeta.RequestMetaBuilder; import org.projectnessie.versioned.VersionStore; +import org.projectnessie.versioned.storage.common.exceptions.ObjNotFoundException; import org.projectnessie.versioned.storage.common.persist.ObjId; import org.projectnessie.versioned.storage.common.persist.Persist; import org.slf4j.Logger; @@ -170,6 +179,17 @@ public Optional validateStorageLocation(String location) { return objectIO.canResolve(uri); } + @Override + public boolean checkIcebergSnapshotPresent(String metadataLocation, long versionId) { + var objId = snapshotObjIdForContent(metadataLocation, versionId); + try { + persist.fetchObj(objId); + return true; + } catch (ObjNotFoundException e) { + return false; + } + } + @Override public StorageUri locationForEntity( WarehouseConfig warehouse, @@ -288,7 +308,12 @@ public Stream>> retrieveSnapshots( return snapshotStage.thenApply( snapshot -> snapshotResponse( - key, c.getContent(), reqParams, snapshot, effectiveReference)); + key, + c.getContent(), + reqParams, + snapshot, + List.of(), + effectiveReference)); }; }) .filter(Objects::nonNull); @@ -303,7 +328,7 @@ public CompletionStage retrieveSnapshot( ApiContext apiContext) throws NessieNotFoundException { - ParsedReference reference = reqParams.ref(); + var reference = reqParams.ref(); LOGGER.trace( "retrieveTableSnapshot ref-name:{} ref-hash:{} key:{}", @@ -311,23 +336,116 @@ public CompletionStage retrieveSnapshot( reference.hashWithRelativeSpec(), key); - ContentResponse contentResponse = - contentService(apiContext) - .getContent( - key, reference.name(), reference.hashWithRelativeSpec(), false, requestMeta); - Content content = contentResponse.getContent(); + var contentHistory = + treeService(apiContext) + .getContentHistory( + key, reference.name(), reference.hashWithRelativeSpec(), requestMeta); + var historyLog = contentHistory.history(); + + if (!historyLog.hasNext()) { + throw new NessieContentNotFoundException(key, reference.name()); + } + var first = historyLog.next(); + var effectiveReference = contentHistory.reference(); + var content = first.getContent(); if (expectedType != null && !content.getType().equals(expectedType)) { throw new NessieContentNotFoundException(key, reference.name()); } - Reference effectiveReference = contentResponse.getEffectiveReference(); + var currentSnapshotStage = + icebergStuff().retrieveIcebergSnapshot(snapshotObjIdForContent(content), content); - ObjId snapshotId = snapshotObjIdForContent(content); + BiFunction, List, SnapshotResponse> + responseBuilder = + (snapshot, history) -> + snapshotResponse(key, content, reqParams, snapshot, history, effectiveReference); - CompletionStage> snapshotStage = - icebergStuff().retrieveIcebergSnapshot(snapshotId, content); + if (!reqParams.snapshotFormat().includeOldSnapshots()) { + return currentSnapshotStage.thenApply(snapshot -> responseBuilder.apply(snapshot, List.of())); + } - return snapshotStage.thenApply( - snapshot -> snapshotResponse(key, content, reqParams, snapshot, effectiveReference)); + return currentSnapshotStage.thenCompose( + snapshot -> collectSnapshotHistory(first.getKey(), snapshot, historyLog, responseBuilder)); + } + + private CompletionStage collectSnapshotHistory( + ContentKey key, + NessieEntitySnapshot snapshot, + Iterator historyLog, + BiFunction, List, R> responseBuilder) { + if (!(snapshot instanceof NessieTableSnapshot)) { + // Not an Iceberg table, no previous snapshots + return completedStage(responseBuilder.apply(snapshot, List.of())); + } + + var tableSnapshot = (NessieTableSnapshot) snapshot; + var previousSnapshotIds = tableSnapshot.previousIcebergSnapshotIds(); + if (previousSnapshotIds.isEmpty()) { + return completedStage(responseBuilder.apply(snapshot, List.of())); + } + + // Collect history + + var remainingSnapshotIdsFromHistory = new HashSet<>(previousSnapshotIds); + remainingSnapshotIdsFromHistory.removeAll(tableSnapshot.implicitIcebergSnapshots().keySet()); + + var olderSnaps = new HashMap>>(); + + while (!remainingSnapshotIdsFromHistory.isEmpty() && historyLog.hasNext()) { + var next = historyLog.next(); + var nextContent = next.getContent(); + if (!(nextContent instanceof IcebergContent)) { + // should never happen, really + continue; + } + var nextSnapshotId = ((IcebergContent) nextContent).getVersionId(); + if (!remainingSnapshotIdsFromHistory.remove(nextSnapshotId)) { + // not a snapshot that we need + continue; + } + + var olderSnap = + icebergStuff() + .retrieveIcebergSnapshot(snapshotObjIdForContent(nextContent), nextContent) + .exceptionally( + t -> { + // There is not much we can do when the retrieval of table-metadata fails. + LOGGER.warn( + "Failed to retrieve table-metadata {}", + ((IcebergContent) nextContent).getMetadataLocation()); + return null; + }) + .toCompletableFuture(); + olderSnaps.put(nextSnapshotId, olderSnap); + } + + var olderSnapsFuture = + CompletableFuture.allOf(olderSnaps.values().toArray(CompletableFuture[]::new)); + + return olderSnapsFuture.thenApply( + x -> { + var history = new ArrayList(); + for (Long previousSnapshotId : previousSnapshotIds) { + try { + var snapFuture = olderSnaps.get(previousSnapshotId); + var implicitIcebergSnapshot = + snapFuture != null + ? implicitIcebergSnapshot( + (NessieTableSnapshot) snapFuture.get(0, TimeUnit.SECONDS)) + : ((NessieTableSnapshot) snapshot) + .implicitIcebergSnapshots() + .get(previousSnapshotId); + if (implicitIcebergSnapshot != null) { + history.add(implicitIcebergSnapshot); + } else { + LOGGER.warn( + "Requested snapshot {} for {} not in Nessie history", previousSnapshotId, key); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return responseBuilder.apply(snapshot, history); + }); } private SnapshotResponse snapshotResponse( @@ -335,10 +453,11 @@ private SnapshotResponse snapshotResponse( Content content, SnapshotReqParams reqParams, NessieEntitySnapshot snapshot, + List history, Reference effectiveReference) { if (snapshot instanceof NessieTableSnapshot) { return snapshotTableResponse( - key, content, reqParams, (NessieTableSnapshot) snapshot, effectiveReference); + key, content, reqParams, (NessieTableSnapshot) snapshot, history, effectiveReference); } if (snapshot instanceof NessieViewSnapshot) { return snapshotViewResponse( @@ -353,6 +472,7 @@ private SnapshotResponse snapshotTableResponse( Content content, SnapshotReqParams reqParams, NessieTableSnapshot snapshot, + List history, Reference effectiveReference) { Object result; String fileName; @@ -368,14 +488,14 @@ private SnapshotResponse snapshotTableResponse( break; case ICEBERG_TABLE_METADATA: // Return the snapshot as an Iceberg table-metadata using either the spec-version - // given in - // the request or the one used when the table-metadata was written. + // given in the request or the one used when the table-metadata was written. // TODO Does requesting a table-metadata using another spec-version make any sense? // TODO Response should respect the JsonView / spec-version // TODO Add a check that the original table format was Iceberg (not Delta) result = nessieTableSnapshotToIceberg( snapshot, + history, optionalIcebergSpec(reqParams.reqVersion()), metadataPropertiesTweak(snapshot, effectiveReference)); @@ -557,8 +677,7 @@ CompletionStage commit( .thenCompose( updates -> { Map addedContentsMap = updates.addedContentsMap(); - CompletionStage> current = - CompletableFuture.completedStage(null); + CompletionStage> current = completedStage(null); for (SingleTableUpdate tableUpdate : updates.tableUpdates()) { Content content = tableUpdate.content; if (content.getId() == null) { @@ -601,6 +720,7 @@ public CompletionStage> commit( singleTableUpdate.content, reqParams, singleTableUpdate.snapshot, + singleTableUpdate.history, updates.targetBranch()))); } @@ -676,13 +796,42 @@ private CompletionStage applyIcebergTableCommitOperation( // TODO handle the case when nothing changed -> do not update // e.g. when adding a schema/spec/order that already exists }) - .thenApply( + // Collect the history of the table required to construct Iceberg's TableMetadata + .thenCompose( updateState -> { - NessieTableSnapshot nessieSnapshot = updateState.snapshot(); + try { + var contentHistory = + treeService(apiContext) + .getContentHistory( + op.getKey(), reference.getName(), reference.getHash(), API_WRITE); + var snapshot = updateState.snapshot(); + + return collectSnapshotHistory( + op.getKey(), + snapshot, + contentHistory.history(), + (snap, history) -> Map.entry(updateState, history)); + } catch (NessieContentNotFoundException e) { + return completedStage( + Map.entry(updateState, List.of())); + } catch (NessieNotFoundException e) { + throw new RuntimeException(e); + } + }) + .thenApply( + stateWithHistory -> { + var updateState = stateWithHistory.getKey(); + var history = stateWithHistory.getValue(); + var nessieSnapshot = updateState.snapshot(); + // Note: 'history' contains the _current_ snapshot on which the table change was + // based String metadataJsonLocation = icebergMetadataJsonLocation(nessieSnapshot.icebergLocation()); + IcebergTableMetadata icebergMetadata = - storeTableSnapshot(metadataJsonLocation, nessieSnapshot, multiTableUpdate); + nessieTableSnapshotToIceberg( + nessieSnapshot, history, Optional.empty(), p -> {}); + storeSnapshot(metadataJsonLocation, icebergMetadata, multiTableUpdate); Content updated = icebergMetadataToContent(metadataJsonLocation, icebergMetadata, contentId); @@ -691,7 +840,11 @@ private CompletionStage applyIcebergTableCommitOperation( SingleTableUpdate singleTableUpdate = new SingleTableUpdate( - nessieSnapshot, updated, icebergOp.getKey(), updateState.catalogOps()); + nessieSnapshot, + history, + updated, + icebergOp.getKey(), + updateState.catalogOps()); multiTableUpdate.addUpdate(op.getKey(), singleTableUpdate); return singleTableUpdate; }); @@ -753,7 +906,8 @@ private CompletionStage applyIcebergViewCommitOperation( String metadataJsonLocation = icebergMetadataJsonLocation(nessieSnapshot.icebergLocation()); IcebergViewMetadata icebergMetadata = - storeViewSnapshot(metadataJsonLocation, nessieSnapshot, multiTableUpdate); + nessieViewSnapshotToIceberg(nessieSnapshot, Optional.empty(), p -> {}); + storeSnapshot(metadataJsonLocation, icebergMetadata, multiTableUpdate); Content updated = icebergMetadataToContent(metadataJsonLocation, icebergMetadata, contentId); ObjId snapshotId = snapshotObjIdForContent(updated); @@ -761,7 +915,11 @@ private CompletionStage applyIcebergViewCommitOperation( SingleTableUpdate singleTableUpdate = new SingleTableUpdate( - nessieSnapshot, updated, icebergOp.getKey(), updateState.catalogOps()); + nessieSnapshot, + List.of(), + updated, + icebergOp.getKey(), + updateState.catalogOps()); multiTableUpdate.addUpdate(op.getKey(), singleTableUpdate); return singleTableUpdate; }); @@ -858,23 +1016,7 @@ private CompletionStage loadExistingViewSnapshot(Content con return icebergStuff().retrieveIcebergSnapshot(snapshotId, content); } - private IcebergTableMetadata storeTableSnapshot( - String metadataJsonLocation, - NessieTableSnapshot snapshot, - MultiTableUpdate multiTableUpdate) { - IcebergTableMetadata tableMetadata = - nessieTableSnapshotToIceberg(snapshot, Optional.empty(), p -> {}); - return storeSnapshot(metadataJsonLocation, tableMetadata, multiTableUpdate); - } - - private IcebergViewMetadata storeViewSnapshot( - String metadataJsonLocation, NessieViewSnapshot snapshot, MultiTableUpdate multiTableUpdate) { - IcebergViewMetadata viewMetadata = - nessieViewSnapshotToIceberg(snapshot, Optional.empty(), p -> {}); - return storeSnapshot(metadataJsonLocation, viewMetadata, multiTableUpdate); - } - - private M storeSnapshot( + private void storeSnapshot( String metadataJsonLocation, M metadata, MultiTableUpdate multiTableUpdate) { multiTableUpdate.addStoredLocation(metadataJsonLocation); try (OutputStream out = objectIO.writeObject(StorageUri.of(metadataJsonLocation))) { @@ -882,7 +1024,6 @@ private M storeSnapshot( } catch (Exception ex) { throw new RuntimeException("Failed to write snapshot to: " + metadataJsonLocation, ex); } - return metadata; } private static Optional optionalIcebergSpec(OptionalInt specVersion) { diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java index a07492dc209..ad0102218f6 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.projectnessie.catalog.formats.iceberg.nessie.CatalogOps; +import org.projectnessie.catalog.model.snapshot.ImplicitIcebergSnapshot; import org.projectnessie.catalog.model.snapshot.NessieEntitySnapshot; import org.projectnessie.error.NessieConflictException; import org.projectnessie.error.NessieNotFoundException; @@ -128,16 +129,19 @@ void addStoredLocation(String location) { static final class SingleTableUpdate { final NessieEntitySnapshot snapshot; + final List history; final Content content; final ContentKey key; final Set catalogOps; SingleTableUpdate( NessieEntitySnapshot snapshot, + List history, Content content, ContentKey key, Set catalogOps) { this.snapshot = snapshot; + this.history = history; this.content = content; this.key = key; this.catalogOps = catalogOps; diff --git a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestCatalogServiceImpl.java b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestCatalogServiceImpl.java index 7f0a278d0bf..6c4c48d3391 100644 --- a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestCatalogServiceImpl.java +++ b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestCatalogServiceImpl.java @@ -336,6 +336,7 @@ public void singleTableCreate() throws Exception { IcebergTableMetadata icebergMetadata = NessieModelIceberg.nessieTableSnapshotToIceberg( (NessieTableSnapshot) snap.nessieSnapshot(), + List.of(), Optional.empty(), m -> m.putAll(icebergMetadataEntity.properties())); diff --git a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/CatalogUriResolverImpl.java b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/CatalogUriResolverImpl.java deleted file mode 100644 index 968f2126446..00000000000 --- a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/CatalogUriResolverImpl.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (C) 2024 Dremio - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.projectnessie.catalog.service.rest; - -import static java.net.URLEncoder.encode; -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.net.URI; -import org.projectnessie.catalog.model.snapshot.NessieEntitySnapshot; -import org.projectnessie.catalog.service.api.CatalogService; -import org.projectnessie.model.ContentKey; -import org.projectnessie.model.Reference; - -class CatalogUriResolverImpl implements CatalogService.CatalogUriResolver { - private final URI baseUri; - - CatalogUriResolverImpl(ExternalBaseUri requestUri) { - this.baseUri = requestUri.catalogBaseURI(); - } - - @Override - public URI icebergSnapshot( - Reference effectiveReference, ContentKey key, NessieEntitySnapshot snapshot) { - return baseUri.resolve( - "trees/" - + encode(effectiveReference.toPathString(), UTF_8) - + "/snapshot/" - + encode(key.toPathStringEscaped(), UTF_8) - + "?format=iceberg"); - } -} diff --git a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1ResourceBase.java b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1ResourceBase.java index e774b230b60..fe3aaf0a462 100644 --- a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1ResourceBase.java +++ b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1ResourceBase.java @@ -39,7 +39,6 @@ import com.google.common.base.Splitter; import io.smallrye.mutiny.Uni; import java.io.IOException; -import java.net.URI; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.HashMap; @@ -57,7 +56,6 @@ import org.projectnessie.catalog.formats.iceberg.rest.IcebergUpdateEntityRequest; import org.projectnessie.catalog.service.api.CatalogCommit; import org.projectnessie.catalog.service.api.CatalogEntityAlreadyExistsException; -import org.projectnessie.catalog.service.api.CatalogService; import org.projectnessie.catalog.service.api.SnapshotReqParams; import org.projectnessie.catalog.service.api.SnapshotResponse; import org.projectnessie.catalog.service.config.LakehouseConfig; @@ -333,15 +331,6 @@ static Branch checkBranch(Reference reference) { return (Branch) reference; } - protected String snapshotMetadataLocation(SnapshotResponse snap) { - // TODO the resolved metadataLocation is wrong !! - CatalogService.CatalogUriResolver catalogUriResolver = new CatalogUriResolverImpl(uriInfo); - URI metadataLocation = - catalogUriResolver.icebergSnapshot( - snap.effectiveReference(), snap.contentKey(), snap.nessieSnapshot()); - return metadataLocation.toString(); - } - static Map createEntityProperties(Map providedProperties) { Map properties = new HashMap<>(); properties.put("created-at", OffsetDateTime.now(ZoneOffset.UTC).toString()); diff --git a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1TableResource.java b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1TableResource.java index caa55bded04..899e7684ece 100644 --- a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1TableResource.java +++ b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1TableResource.java @@ -15,6 +15,7 @@ */ package org.projectnessie.catalog.service.rest; +import static com.google.common.base.Preconditions.checkState; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; @@ -39,13 +40,13 @@ import static org.projectnessie.catalog.formats.iceberg.rest.IcebergMetadataUpdate.UpgradeFormatVersion.upgradeFormatVersion; import static org.projectnessie.catalog.service.rest.TableRef.tableRef; import static org.projectnessie.model.Content.Type.ICEBERG_TABLE; -import static org.projectnessie.model.Reference.ReferenceType.BRANCH; import static org.projectnessie.versioned.RequestMeta.API_WRITE; import static org.projectnessie.versioned.RequestMeta.apiWrite; import com.google.common.collect.Lists; import io.smallrye.common.annotation.Blocking; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.unchecked.Unchecked; import jakarta.enterprise.context.RequestScoped; import jakarta.inject.Inject; import jakarta.validation.Valid; @@ -103,11 +104,11 @@ import org.projectnessie.error.NessieContentNotFoundException; import org.projectnessie.error.NessieNotFoundException; import org.projectnessie.model.Branch; -import org.projectnessie.model.CommitResponse; import org.projectnessie.model.Content; import org.projectnessie.model.ContentKey; import org.projectnessie.model.ContentResponse; import org.projectnessie.model.FetchOption; +import org.projectnessie.model.IcebergContent; import org.projectnessie.model.IcebergTable; import org.projectnessie.model.ImmutableOperations; import org.projectnessie.model.Operation.Delete; @@ -179,11 +180,16 @@ public Uni loadCredentials( @HeaderParam("X-Iceberg-Access-Delegation") String dataAccess) throws IOException { - return loadTable(prefix, namespace, table, null, dataAccess) + TableRef tableRef = decodeTableRef(prefix, namespace, table); + + // TODO should be optimized to not load the old snapshots in a follow-up, at best specialized to + // only retrieve the credentials + + return loadTable(tableRef, prefix, dataAccess, false) .map( loadTableResponse -> { var creds = loadTableResponse.storageCredentials(); - + checkState(creds != null, "no storage credentials for {}", tableRef); return ImmutableIcebergLoadCredentialsResponse.of(creds); }); } @@ -354,6 +360,7 @@ public Uni createTable( IcebergTableMetadata stagedTableMetadata = nessieTableSnapshotToIceberg( snapshot, + List.of(), Optional.empty(), map -> map.put(IcebergTableMetadata.STAGED_PROPERTY, "true")); @@ -415,106 +422,83 @@ public Uni registerTable( ParsedReference reference = requireNonNull(tableRef.reference()); Branch ref = checkBranch(treeService.getReferenceByName(reference.name(), FetchOption.MINIMAL)); - RequestMetaBuilder requestMeta = - apiWrite().addKeyAction(tableRef.contentKey(), CatalogOps.CATALOG_REGISTER_ENTITY.name()); - - Optional catalogTableRef = - uriInfo.resolveTableFromUri(registerTableRequest.metadataLocation()); - boolean nessieCatalogUri = uriInfo.isNessieCatalogUri(registerTableRequest.metadataLocation()); - if (catalogTableRef.isPresent() && nessieCatalogUri) { - // In case the metadataLocation in the IcebergRegisterTableRequest contains a URI for _this_ - // Nessie Catalog, use the existing data/objects. - - // Taking a "shortcut" here, we use the 'old Content object' and re-add it in a Nessie commit. - - TableRef ctr = catalogTableRef.get(); - - ContentResponse contentResponse = fetchIcebergTable(ctr, true); - // It's technically a new table for Nessie, so need to clear the content-ID. - Content newContent = contentResponse.getContent().withId(null); - - Operations ops = - ImmutableOperations.builder() - .addOperations(Put.of(ctr.contentKey(), newContent)) - .commitMeta( - updateCommitMeta( - format( - "Register Iceberg table '%s' from '%s'", - ctr.contentKey(), registerTableRequest.metadataLocation()))) - .build(); - CommitResponse committed = - treeService.commitMultipleOperations( - ref.getName(), ref.getHash(), ops, requestMeta.build()); - - return this.loadTable( - TableRef.tableRef( - ctr.contentKey(), - ParsedReference.parsedReference( - committed.getTargetBranch().getName(), - committed.getTargetBranch().getHash(), - BRANCH), - tableRef.warehouse()), - prefix, - dataAccess, - true); - } else if (nessieCatalogUri) { - throw new IllegalArgumentException( - "Cannot register an Iceberg table using the URI " - + registerTableRequest.metadataLocation()); - } - - // Register table from "external" metadata-location - - IcebergTableMetadata tableMetadata; - try (InputStream metadataInput = - objectIO.readObject(StorageUri.of(registerTableRequest.metadataLocation()))) { - tableMetadata = - IcebergJson.objectMapper().readValue(metadataInput, IcebergTableMetadata.class); - } - - catalogService - .validateStorageLocation(tableMetadata.location()) - .ifPresent( - msg -> { - throw new IllegalArgumentException( - format( - "Location for table '%s' to be registered cannot be associated with any configured object storage location: %s", - tableRef.contentKey(), msg)); - }); - - ToIntFunction safeUnbox = i -> i != null ? i : 0; + var requestMeta = + apiWrite() + .addKeyAction(tableRef.contentKey(), CatalogOps.CATALOG_REGISTER_ENTITY.name()) + .build(); - Content newContent = - IcebergTable.of( - registerTableRequest.metadataLocation(), - tableMetadata.currentSnapshotId(), - safeUnbox.applyAsInt(tableMetadata.currentSchemaId()), - safeUnbox.applyAsInt(tableMetadata.defaultSpecId()), - safeUnbox.applyAsInt(tableMetadata.defaultSortOrderId())); - Operations ops = - ImmutableOperations.builder() - .addOperations(Put.of(tableRef.contentKey(), newContent)) - .commitMeta( - updateCommitMeta( + var fetchTableMetadata = + Uni.createFrom() + .item( + Unchecked.supplier( + () -> { + var metadataLocation = registerTableRequest.metadataLocation(); + try (InputStream metadataInput = + objectIO.readObject(StorageUri.of(metadataLocation))) { + var tableMetadata = + IcebergJson.objectMapper() + .readValue(metadataInput, IcebergTableMetadata.class); + return Map.entry(metadataLocation, tableMetadata); + } + })); + + return fetchTableMetadata.chain( + Unchecked.function( + tableMetadataLocationAndObject -> { + var metadataLocation = tableMetadataLocationAndObject.getKey(); + var tableMetadata = tableMetadataLocationAndObject.getValue(); + catalogService + .validateStorageLocation(tableMetadata.location()) + .ifPresent( + msg -> { + throw new IllegalArgumentException( + format( + "Location for table '%s' to be registered cannot be associated with any configured object storage location: %s", + tableRef.contentKey(), msg)); + }); + + if (catalogService.checkIcebergSnapshotPresent( + metadataLocation, tableMetadata.currentSnapshotId())) { + throw new IllegalArgumentException( format( - "Register Iceberg table '%s' from '%s'", - tableRef.contentKey(), registerTableRequest.metadataLocation()))) - .build(); - CommitResponse committed = - treeService.commitMultipleOperations( - ref.getName(), ref.getHash(), ops, requestMeta.build()); - - return this.loadTable( - tableRef( - tableRef.contentKey(), - parsedReference( - committed.getTargetBranch().getName(), - committed.getTargetBranch().getHash(), - committed.getTargetBranch().getType()), - tableRef.warehouse()), - prefix, - dataAccess, - true); + "Table '%s' cannot be registered with this metadata location '%s', because the location is already managed in this Nessie catalog", + tableRef.contentKey(), metadataLocation)); + } + + ToIntFunction safeUnbox = i -> i != null ? i : 0; + + Content newContent = + IcebergTable.of( + metadataLocation, + tableMetadata.currentSnapshotId(), + safeUnbox.applyAsInt(tableMetadata.currentSchemaId()), + safeUnbox.applyAsInt(tableMetadata.defaultSpecId()), + safeUnbox.applyAsInt(tableMetadata.defaultSortOrderId())); + Operations ops = + ImmutableOperations.builder() + .addOperations(Put.of(tableRef.contentKey(), newContent)) + .commitMeta( + updateCommitMeta( + format( + "Register Iceberg table '%s' from '%s'", + tableRef.contentKey(), metadataLocation))) + .build(); + var committed = + treeService.commitMultipleOperations( + ref.getName(), ref.getHash(), ops, requestMeta); + + return this.loadTable( + tableRef( + tableRef.contentKey(), + parsedReference( + committed.getTargetBranch().getName(), + committed.getTargetBranch().getHash(), + committed.getTargetBranch().getType()), + tableRef.warehouse()), + prefix, + dataAccess, + true); + })); } @Operation(operationId = "iceberg.v1.dropTable") @@ -634,9 +618,10 @@ public Uni updateTable( (IcebergTableMetadata) snap.entityObject() .orElseThrow(() -> new IllegalStateException("entity object missing")); + var metadataLocation = ((IcebergContent) snap.content()).getMetadataLocation(); return IcebergCommitTableResponse.builder() .metadata(tableMetadata) - .metadataLocation(snapshotMetadataLocation(snap)) + .metadataLocation(metadataLocation) .build(); }); } diff --git a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1ViewResource.java b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1ViewResource.java index afd3e005c7b..eae9d00bf18 100644 --- a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1ViewResource.java +++ b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1ViewResource.java @@ -71,6 +71,7 @@ import org.projectnessie.model.Branch; import org.projectnessie.model.ContentKey; import org.projectnessie.model.ContentResponse; +import org.projectnessie.model.IcebergContent; import org.projectnessie.model.IcebergView; import org.projectnessie.model.ImmutableOperations; import org.projectnessie.model.Operation.Delete; @@ -288,9 +289,10 @@ public Uni updateView( (IcebergViewMetadata) snap.entityObject() .orElseThrow(() -> new IllegalStateException("entity object missing")); + var metadataLocation = ((IcebergContent) snap.content()).getMetadataLocation(); return IcebergLoadViewResponse.builder() .metadata(viewMetadata) - .metadataLocation(snapshotMetadataLocation(snap)) + .metadataLocation(metadataLocation) .build(); }); } diff --git a/catalog/service/transfer/src/main/java/org/projectnessie/catalog/service/objtypes/transfer/CatalogObjIds.java b/catalog/service/transfer/src/main/java/org/projectnessie/catalog/service/objtypes/transfer/CatalogObjIds.java index 5ab4176e052..3b752860712 100644 --- a/catalog/service/transfer/src/main/java/org/projectnessie/catalog/service/objtypes/transfer/CatalogObjIds.java +++ b/catalog/service/transfer/src/main/java/org/projectnessie/catalog/service/objtypes/transfer/CatalogObjIds.java @@ -34,15 +34,17 @@ private CatalogObjIds() {} public static ObjId snapshotIdForContent(Content content) { if (content.getType().equals(ICEBERG_TABLE) || content.getType().equals(ICEBERG_VIEW)) { - IcebergContent icebergContent = (IcebergContent) content; - return objIdHasher("ContentSnapshot") - .hash(icebergContent.getMetadataLocation()) - .hash(icebergContent.getVersionId()) - .generate(); + var icebergContent = (IcebergContent) content; + return snapshotIdForContent( + icebergContent.getMetadataLocation(), icebergContent.getVersionId()); } return null; } + public static ObjId snapshotIdForContent(String metadataLocation, long versionId) { + return objIdHasher("ContentSnapshot").hash(metadataLocation).hash(versionId).generate(); + } + public static ObjId entityIdForContent(Content content) { if (content.getType().equals(ICEBERG_TABLE) || content.getType().equals(ICEBERG_VIEW)) { return objIdHasher("NessieEntity") diff --git a/servers/quarkus-server/src/test/java/org/projectnessie/server/authz/TestAuthzMeta.java b/servers/quarkus-server/src/test/java/org/projectnessie/server/authz/TestAuthzMeta.java index 8f38d060144..c11fc2915af 100644 --- a/servers/quarkus-server/src/test/java/org/projectnessie/server/authz/TestAuthzMeta.java +++ b/servers/quarkus-server/src/test/java/org/projectnessie/server/authz/TestAuthzMeta.java @@ -26,6 +26,7 @@ import static org.projectnessie.services.authz.Check.CheckType.READ_ENTITY_VALUE; import static org.projectnessie.services.authz.Check.CheckType.UPDATE_ENTITY; import static org.projectnessie.services.authz.Check.canCommitChangeAgainstReference; +import static org.projectnessie.services.authz.Check.canListCommitLog; import static org.projectnessie.services.authz.Check.canReadContentKey; import static org.projectnessie.services.authz.Check.canReadEntries; import static org.projectnessie.services.authz.Check.canViewReference; @@ -170,6 +171,15 @@ public void icebergApiTable() { check(READ_ENTITY_VALUE, branch, tableKey, Set.of("CATALOG_CREATE_ENTITY")), check(CREATE_ENTITY, branch, tableKey, Set.of("CATALOG_CREATE_ENTITY"))), Map.of()), + authzCheck( + apiContext, + List.of( + canViewReference(branch), + canListCommitLog(branch), + canCommitChangeAgainstReference(branch), + check(READ_ENTITY_VALUE, branch, tableKey), + check(CREATE_ENTITY, branch, tableKey)), + Map.of()), // actual 'commit' authzCheck( apiContext, diff --git a/servers/quarkus-server/src/test/java/org/projectnessie/server/catalog/TestNessieCore.java b/servers/quarkus-server/src/test/java/org/projectnessie/server/catalog/TestNessieCore.java index 1162866b152..445ac2476a4 100644 --- a/servers/quarkus-server/src/test/java/org/projectnessie/server/catalog/TestNessieCore.java +++ b/servers/quarkus-server/src/test/java/org/projectnessie/server/catalog/TestNessieCore.java @@ -249,7 +249,7 @@ public void tableMetadata(int specVersion) throws Exception { .isNotEmpty() .get() .extracting(IcebergSnapshot::manifestList, IcebergSnapshot::manifests) - .containsExactly(null, emptyList()); + .containsExactly("s3://this-does-not-exist/anywhere/", emptyList()); } private static String httpRequestString(URI uri) throws Exception { diff --git a/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/AbstractIcebergCatalogTests.java b/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/AbstractIcebergCatalogTests.java index 5ff141805bb..18b9d7c2793 100644 --- a/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/AbstractIcebergCatalogTests.java +++ b/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/AbstractIcebergCatalogTests.java @@ -56,6 +56,7 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdatePartitionSpec; import org.apache.iceberg.UpdateSchema; @@ -552,6 +553,9 @@ public void testRegisterTableFromFileSystem() throws Exception { .create(); originalTable.newFastAppend().appendFile(FILE_A).commit(); + originalTable.newFastAppend().appendFile(FILE_B).commit(); + originalTable.newDelete().deleteFile(FILE_A).commit(); + originalTable.newFastAppend().appendFile(FILE_C).commit(); TableOperations ops = ((BaseTable) originalTable).operations(); @@ -579,6 +583,9 @@ public void testRegisterTableFromFileSystem() throws Exception { assertThat(registeredTable.sortOrders()) .as("Sort orders must match") .isEqualTo(originalTable.sortOrders()); + assertThat(registeredTable.currentSnapshot().parentId()) + .as("Current snapshot's parent-ID must match") + .isEqualTo(originalTable.currentSnapshot().parentId()); assertThat(registeredTable.currentSnapshot()) .as("Current snapshot must match") .isEqualTo(originalTable.currentSnapshot()); @@ -589,6 +596,12 @@ public void testRegisterTableFromFileSystem() throws Exception { .as("History must match") .isEqualTo(originalTable.history()); + TestHelpers.assertSameSchemaMap(registeredTable.schemas(), originalTable.schemas()); + assertFiles(registeredTable, FILE_B, FILE_C); + + registeredTable.newFastAppend().appendFile(FILE_A).commit(); + assertFiles(registeredTable, FILE_B, FILE_C, FILE_A); + assertThat(catalog.loadTable(TABLE)).isNotNull(); assertThat(catalog.dropTable(TABLE)).isTrue(); assertThat(catalog.tableExists(TABLE)).isFalse(); diff --git a/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/CatalogTests.java b/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/CatalogTests.java index 7422acf162b..4823b44f6ee 100644 --- a/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/CatalogTests.java +++ b/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/CatalogTests.java @@ -2664,7 +2664,7 @@ public void tableCreationWithoutNamespace() { } @Test - public void testRegisterTable() { + public void testRegisterTable() throws Exception { C catalog = catalog(); if (requiresNamespaceCreate()) { @@ -2691,7 +2691,14 @@ public void testRegisterTable() { catalog.dropTable(TABLE, false /* do not purge */); - Table registeredTable = catalog.registerTable(TABLE, metadataLocation); + var newMetadataLocation = metadataLocation + "-duplicated.metadata.json"; + try (var input = ops.io().newInputFile(metadataLocation).newStream()) { + try (var output = ops.io().newOutputFile(newMetadataLocation).create()) { + input.transferTo(output); + } + } + + Table registeredTable = catalog.registerTable(TABLE, newMetadataLocation); assertThat(registeredTable).isNotNull(); assertThat(catalog.tableExists(TABLE)).as("Table must exist").isTrue(); @@ -2705,6 +2712,9 @@ public void testRegisterTable() { assertThat(registeredTable.sortOrders()) .as("Sort orders must match") .isEqualTo(originalTable.sortOrders()); + assertThat(registeredTable.currentSnapshot().parentId()) + .as("Current snapshot's parent-ID must match") + .isEqualTo(originalTable.currentSnapshot().parentId()); assertThat(registeredTable.currentSnapshot()) .as("Current snapshot must match") .isEqualTo(originalTable.currentSnapshot()); diff --git a/servers/services/src/main/java/org/projectnessie/services/impl/TreeApiImpl.java b/servers/services/src/main/java/org/projectnessie/services/impl/TreeApiImpl.java index cdff0c28ff5..39e453324b6 100644 --- a/servers/services/src/main/java/org/projectnessie/services/impl/TreeApiImpl.java +++ b/servers/services/src/main/java/org/projectnessie/services/impl/TreeApiImpl.java @@ -41,6 +41,7 @@ import static org.projectnessie.services.cel.CELUtil.VAR_REF_META; import static org.projectnessie.services.cel.CELUtil.VAR_REF_TYPE; import static org.projectnessie.services.impl.RefUtil.toNamedRef; +import static org.projectnessie.services.impl.RefUtil.toReference; import static org.projectnessie.versioned.RequestMeta.API_WRITE; import com.google.common.base.Strings; @@ -64,6 +65,7 @@ import org.projectnessie.cel.tools.Script; import org.projectnessie.cel.tools.ScriptException; import org.projectnessie.error.NessieConflictException; +import org.projectnessie.error.NessieContentNotFoundException; import org.projectnessie.error.NessieNotFoundException; import org.projectnessie.error.NessieReferenceAlreadyExistsException; import org.projectnessie.error.NessieReferenceConflictException; @@ -109,6 +111,8 @@ import org.projectnessie.services.config.ServerConfig; import org.projectnessie.services.hash.HashValidator; import org.projectnessie.services.hash.ResolvedHash; +import org.projectnessie.services.spi.ContentHistory; +import org.projectnessie.services.spi.ImmutableContentHistory; import org.projectnessie.services.spi.PagedResponseHandler; import org.projectnessie.services.spi.TreeService; import org.projectnessie.versioned.BranchName; @@ -417,6 +421,55 @@ && getServerConfig().getDefaultBranch().equals(ref.getName())), } } + @Override + public ContentHistory getContentHistory( + ContentKey key, + @Nullable String namedRef, + @Nullable String hashOnRef, + RequestMeta requestMeta) + throws NessieNotFoundException { + try { + var ref = + getHashResolver() + .resolveHashOnRef(namedRef, hashOnRef, new HashValidator("Expected hash")); + + boolean forWrite = requestMeta.forWrite(); + Set actions = requestMeta.keyActions(key); + var r = ref.getNamedRef(); + var accessChecks = startAccessCheck().canListCommitLog(r); + if (forWrite) { + accessChecks.canCommitChangeAgainstReference(r); + } + + var identifiedKeys = getStore().getIdentifiedKeys(ref.getHash(), List.of(key), true); + var identifiedKey = identifiedKeys.get(0); + if (identifiedKey.type() == null) { + if (forWrite) { + accessChecks + .canReadEntityValue(r, identifiedKey, actions) + .canCreateEntity(r, identifiedKey, actions); + } + accessChecks.checkAndThrow(); + + throw new NessieContentNotFoundException(key, namedRef); + } + + accessChecks.canReadEntityValue(r, identifiedKey, actions); + if (forWrite) { + accessChecks.canUpdateEntity(r, identifiedKey, actions); + } + accessChecks.checkAndThrow(); + + var contentHistory = getStore().getContentChanges(ref.getHash(), key); + return ImmutableContentHistory.builder() + .history(contentHistory) + .reference(toReference(r, ref.getHash())) + .build(); + } catch (ReferenceNotFoundException e) { + throw new NessieReferenceNotFoundException(e.getMessage(), e); + } + } + @Override public R getCommitLog( String namedRef, @@ -507,7 +560,8 @@ private LogEntry logEntryOperationsAccessCheck( getStore() .getIdentifiedKeys( endRef.getHash(), - operations.stream().map(Operation::getKey).collect(Collectors.toList())) + operations.stream().map(Operation::getKey).collect(Collectors.toList()), + false) .forEach(entry -> identifiedKeys.put(entry.contentKey(), entry)); } catch (ReferenceNotFoundException e) { throw new RuntimeException(e); diff --git a/servers/services/src/main/java/org/projectnessie/services/spi/ContentHistory.java b/servers/services/src/main/java/org/projectnessie/services/spi/ContentHistory.java new file mode 100644 index 00000000000..69630adddfd --- /dev/null +++ b/servers/services/src/main/java/org/projectnessie/services/spi/ContentHistory.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.services.spi; + +import java.util.Iterator; +import org.immutables.value.Value; +import org.projectnessie.model.Reference; +import org.projectnessie.versioned.ContentHistoryEntry; + +// Not really an immutable object as it contains an iterator... +@Value.Immutable +public interface ContentHistory { + Reference reference(); + + Iterator history(); +} diff --git a/servers/services/src/main/java/org/projectnessie/services/spi/TreeService.java b/servers/services/src/main/java/org/projectnessie/services/spi/TreeService.java index d8c9e962939..ee169f0499a 100644 --- a/servers/services/src/main/java/org/projectnessie/services/spi/TreeService.java +++ b/servers/services/src/main/java/org/projectnessie/services/spi/TreeService.java @@ -45,6 +45,7 @@ import org.projectnessie.model.Reference.ReferenceType; import org.projectnessie.model.ReferenceHistoryResponse; import org.projectnessie.versioned.NamedRef; +import org.projectnessie.versioned.Ref; import org.projectnessie.versioned.RequestMeta; import org.projectnessie.versioned.WithHash; @@ -110,6 +111,34 @@ Reference deleteReference( String expectedHash) throws NessieConflictException, NessieNotFoundException; + /** + * Retrieve the changes to the content object with the content key {@code key} on a + * reference. This functionality focuses on content changes, neglecting renames. + * + *

The behavior of this function is rather not what an end user would expect, namely + * referencing the commits that actually changed the content. The behavior of this function just + * focuses on the changes, primarily intended to eventually build the snapshot history of an + * Iceberg table. + * + *

The first element returned by the iterator is the current state on the most recent from + * (beginning at {@code ref}). Following elements returned by the iterator refer to the content + * changes, as seen on the most recent commit(s). + * + * @see org.projectnessie.versioned.VersionStore#getContentChanges(Ref, ContentKey) + */ + ContentHistory getContentHistory( + @Valid ContentKey key, + @Valid @Nullable @Pattern(regexp = REF_NAME_REGEX, message = REF_NAME_MESSAGE) + String namedRef, + @Valid + @Nullable + @Pattern( + regexp = HASH_OR_RELATIVE_COMMIT_SPEC_REGEX, + message = HASH_OR_RELATIVE_COMMIT_SPEC_MESSAGE) + String hashOnRef, + RequestMeta requestMeta) + throws NessieNotFoundException; + R getCommitLog( @Valid @NotNull @Pattern(regexp = REF_NAME_REGEX, message = REF_NAME_MESSAGE) String namedRef, FetchOption fetchOption, diff --git a/site/docs/guides/iceberg-rest.md b/site/docs/guides/iceberg-rest.md index 874286389dd..2afbac93fda 100644 --- a/site/docs/guides/iceberg-rest.md +++ b/site/docs/guides/iceberg-rest.md @@ -256,9 +256,14 @@ Both S3 request signing and credentials vending ("assume role") work with `write * The (base) location of tables created via Iceberg REST are mandated by Nessie, which will choose the table's location underneath the location of the warehouse. * Changes to the table base location are ignored. -* Nessie will always return only the Iceberg table snapshot that corresponds to the Nessie commit. - This solves the mismatch between Nessie commits and Iceberg snapshot history. Similarly Nessie - returns the Iceberg view version corresponding to the Nessie commit. +* Returned snapshots + * **Since Nessie version 0.101.0**: + Nessie returns the Iceberg snapshot history for changes that are committed to Nessie via Iceberg + REST - but only for commits using Nessie 0.101.0 or newer! + * Nessie versions **before** 0.101.0: + Nessie will always return only the Iceberg table snapshot that corresponds to the Nessie commit. + This solves the mismatch between Nessie commits and Iceberg snapshot history. Similarly Nessie + returns the Iceberg view version corresponding to the Nessie commit. ## Nessie CLI diff --git a/versioned/spi/src/main/java/org/projectnessie/versioned/ContentHistoryEntry.java b/versioned/spi/src/main/java/org/projectnessie/versioned/ContentHistoryEntry.java new file mode 100644 index 00000000000..27b096849bf --- /dev/null +++ b/versioned/spi/src/main/java/org/projectnessie/versioned/ContentHistoryEntry.java @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned; + +import jakarta.validation.constraints.NotNull; +import org.immutables.value.Value; +import org.projectnessie.model.CommitMeta; +import org.projectnessie.model.Content; +import org.projectnessie.model.ContentKey; + +@Value.Immutable +public interface ContentHistoryEntry { + + /** + * Key by which the {@linkplain #getContent()} can be retrieved on the commit referenced in + * {@linkplain #getCommitMeta() commit meta}. + */ + @NotNull + ContentKey getKey(); + + @NotNull + CommitMeta getCommitMeta(); + + @NotNull + Content getContent(); +} diff --git a/versioned/spi/src/main/java/org/projectnessie/versioned/EventsVersionStore.java b/versioned/spi/src/main/java/org/projectnessie/versioned/EventsVersionStore.java index e0526925cad..00bd90be168 100644 --- a/versioned/spi/src/main/java/org/projectnessie/versioned/EventsVersionStore.java +++ b/versioned/spi/src/main/java/org/projectnessie/versioned/EventsVersionStore.java @@ -17,6 +17,7 @@ import jakarta.annotation.Nonnull; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -155,6 +156,12 @@ public PaginationIterator getCommits(Ref ref, boolean fetchAdditionalInf return delegate.getCommits(ref, fetchAdditionalInfo); } + @Override + public Iterator getContentChanges(Ref ref, ContentKey key) + throws ReferenceNotFoundException { + return delegate.getContentChanges(ref, key); + } + @Override public PaginationIterator getKeys( Ref ref, String pagingToken, boolean withContent, KeyRestrictions keyRestrictions) @@ -163,9 +170,10 @@ public PaginationIterator getKeys( } @Override - public List getIdentifiedKeys(Ref ref, Collection keys) + public List getIdentifiedKeys( + Ref ref, Collection keys, boolean returnNotFound) throws ReferenceNotFoundException { - return delegate.getIdentifiedKeys(ref, keys); + return delegate.getIdentifiedKeys(ref, keys, returnNotFound); } @Override diff --git a/versioned/spi/src/main/java/org/projectnessie/versioned/ObservingVersionStore.java b/versioned/spi/src/main/java/org/projectnessie/versioned/ObservingVersionStore.java index d30cada0c28..e3882b92942 100644 --- a/versioned/spi/src/main/java/org/projectnessie/versioned/ObservingVersionStore.java +++ b/versioned/spi/src/main/java/org/projectnessie/versioned/ObservingVersionStore.java @@ -21,6 +21,7 @@ import io.opentelemetry.instrumentation.annotations.WithSpan; import jakarta.annotation.Nonnull; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -181,6 +182,15 @@ public PaginationIterator getCommits( return delegate.getCommits(ref, fetchAdditionalInfo); } + @WithSpan + @Override + @Counted(PREFIX) + @Timed(value = PREFIX, histogram = true) + public Iterator getContentChanges(Ref ref, ContentKey key) + throws ReferenceNotFoundException { + return delegate.getContentChanges(ref, key); + } + @WithSpan @Override @Counted(PREFIX) @@ -199,9 +209,9 @@ public PaginationIterator getKeys( @Counted(PREFIX) @Timed(value = PREFIX, histogram = true) public List getIdentifiedKeys( - @SpanAttribute(TAG_REF) Ref ref, Collection keys) + @SpanAttribute(TAG_REF) Ref ref, Collection keys, boolean returnNotFound) throws ReferenceNotFoundException { - return delegate.getIdentifiedKeys(ref, keys); + return delegate.getIdentifiedKeys(ref, keys, returnNotFound); } @WithSpan diff --git a/versioned/spi/src/main/java/org/projectnessie/versioned/VersionStore.java b/versioned/spi/src/main/java/org/projectnessie/versioned/VersionStore.java index d0944abc1a5..5b194dd0993 100644 --- a/versioned/spi/src/main/java/org/projectnessie/versioned/VersionStore.java +++ b/versioned/spi/src/main/java/org/projectnessie/versioned/VersionStore.java @@ -20,6 +20,7 @@ import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -336,6 +337,22 @@ PaginationIterator> getNamedRefs( PaginationIterator getCommits(Ref ref, boolean fetchAdditionalInfo) throws ReferenceNotFoundException; + /** + * Retrieve the changes to the content object with the content key {@code key} on {@code + * ref}. This functionality focuses on content changes, neglecting renames. + * + *

The behavior of this function is rather not what an end user would expect, namely + * referencing the commits that actually changed the content. The behavior of this function just + * focuses on the changes, primarily intended to eventually build the snapshot history of an + * Iceberg table. + * + *

The first element returned by the iterator is the current state on the most recent from + * (beginning at {@code ref}). Following elements returned by the iterator refer to the content + * changes, as seen on the most recent commit(s). + */ + Iterator getContentChanges(Ref ref, ContentKey key) + throws ReferenceNotFoundException; + @Value.Immutable interface KeyRestrictions { KeyRestrictions NO_KEY_RESTRICTIONS = KeyRestrictions.builder().build(); @@ -374,7 +391,8 @@ PaginationIterator getKeys( Ref ref, String pagingToken, boolean withContent, KeyRestrictions keyRestrictions) throws ReferenceNotFoundException; - List getIdentifiedKeys(Ref ref, Collection keys) + List getIdentifiedKeys( + Ref ref, Collection keys, boolean returnNotFound) throws ReferenceNotFoundException; /** diff --git a/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/VersionStoreImpl.java b/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/VersionStoreImpl.java index c0583a0ed2b..599c2fdf312 100644 --- a/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/VersionStoreImpl.java +++ b/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/VersionStoreImpl.java @@ -83,22 +83,27 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import javax.annotation.CheckForNull; +import org.projectnessie.model.Branch; import org.projectnessie.model.CommitConsistency; import org.projectnessie.model.CommitMeta; import org.projectnessie.model.Content; import org.projectnessie.model.ContentKey; +import org.projectnessie.model.Detached; import org.projectnessie.model.IdentifiedContentKey; import org.projectnessie.model.Operation; import org.projectnessie.model.RepositoryConfig; +import org.projectnessie.model.Tag; import org.projectnessie.versioned.BranchName; import org.projectnessie.versioned.Commit; import org.projectnessie.versioned.CommitResult; +import org.projectnessie.versioned.ContentHistoryEntry; import org.projectnessie.versioned.ContentResult; import org.projectnessie.versioned.DetachedRef; import org.projectnessie.versioned.Diff; import org.projectnessie.versioned.GetNamedRefsParams; import org.projectnessie.versioned.GetNamedRefsParams.RetrieveOptions; import org.projectnessie.versioned.Hash; +import org.projectnessie.versioned.ImmutableContentHistoryEntry; import org.projectnessie.versioned.ImmutableReferenceAssignedResult; import org.projectnessie.versioned.ImmutableReferenceCreatedResult; import org.projectnessie.versioned.ImmutableReferenceDeletedResult; @@ -134,6 +139,7 @@ import org.projectnessie.versioned.storage.common.exceptions.RetryTimeoutException; import org.projectnessie.versioned.storage.common.indexes.StoreIndex; import org.projectnessie.versioned.storage.common.indexes.StoreIndexElement; +import org.projectnessie.versioned.storage.common.indexes.StoreIndexes; import org.projectnessie.versioned.storage.common.indexes.StoreKey; import org.projectnessie.versioned.storage.common.logic.CommitLogic; import org.projectnessie.versioned.storage.common.logic.ConsistencyLogic; @@ -635,6 +641,130 @@ static R emptyOrNotFound(Ref ref, R namedRefResult) throws ReferenceNotFound return namedRefResult; } + @Override + public Iterator getContentChanges(Ref ref, ContentKey key) + throws ReferenceNotFoundException { + var refMapping = new RefMapping(persist); + var head = refMapping.resolveRefHead(ref); + if (head == null) { + return emptyOrNotFound(ref, PaginationIterator.empty()); + } + + var commitLogic = commitLogic(persist); + var result = commitLogic.commitLog(commitLogQuery(head.id())); + var contentMapping = new ContentMapping(persist); + var indexesLogic = indexesLogic(persist); + + // Return the history of a Nessie content object, considering renames + + return new AbstractIterator<>() { + UUID contentId; + StoreKey storeKey = keyToStoreKey(key); + ContentKey contentKey = key; + ObjId previousContentObjId; + + @CheckForNull + @Override + protected ContentHistoryEntry computeNext() { + while (true) { + if (!result.hasNext()) { + return endOfData(); + } + var currentCommit = result.next(); + var index = indexesLogic.buildCompleteIndex(currentCommit, Optional.empty()); + var elem = index.get(storeKey); + if (elem == null) { + if (contentId == null) { + // Key not in this commit, no more commits to look into + return endOfData(); + } + + // No index-element for 'storeKey' - check whether it has been renamed + var found = false; + for (StoreIndexElement e : index) { + if (contentId.equals(e.content().contentId())) { + // Found the rename-from element, use it + found = true; + storeKey = e.key(); + contentKey = storeKeyToKey(storeKey); + elem = e; + break; + } + } + if (!found) { + // No more commits for this content, not even by looked up by content-ID (rename) + return endOfData(); + } + } + + var commitOp = elem.content(); + if (!commitOp.action().exists()) { + // Key marked as removed in this commit, no more commits to look into + return endOfData(); + } + // It is rather safe to assume that we have a content ID - this can only be null, if there + // was a non-UUID content-ID + var cid = requireNonNull(commitOp.contentId()); + if (contentId == null) { + // First occurrence, just memoize the content ID + contentId = cid; + } else if (!cid.equals(contentId)) { + // This may happen, if after a series of renames, when another table (different + // content-ID) has "our" content-key. + // Try to look up the key by looking up the key by content-ID. + var found = false; + for (StoreIndexElement e : index) { + if (contentId.equals(e.content().contentId())) { + // Found the rename-from element, use it + found = true; + storeKey = e.key(); + contentKey = storeKeyToKey(storeKey); + break; + } + } + if (!found) { + // If not found, then there are no more commits for our content. + return endOfData(); + } + } + + try { + var contentObjId = requireNonNull(commitOp.value()); + if (contentObjId.equals(previousContentObjId)) { + // Content did not change, continue with next commit - we only report the changes, + // latest visible commit with a change. + continue; + } + previousContentObjId = contentObjId; + var content = contentMapping.fetchContent(contentObjId); + // TODO (follow-up) bulk fetch content objects - it's way more complicated though + return ImmutableContentHistoryEntry.builder() + .key(contentKey) + .content(content) + .commitMeta(toCommitMeta(currentCommit)) + .build(); + } catch (ObjNotFoundException e) { + // This should really never happen - if it does, something is seriously broken in the + // backend database. + throw new RuntimeException(e); + } + } + } + }; + } + + private static Function makeReferenceBuilder(Ref ref) { + Function referenceBuilder; + if (ref instanceof BranchName) { + referenceBuilder = oid -> Branch.of(((BranchName) ref).getName(), oid.toString()); + } else if (ref instanceof TagName) { + referenceBuilder = oid -> Tag.of(((TagName) ref).getName(), oid.toString()); + } else { + referenceBuilder = oid -> Detached.of(oid.toString()); + } + return referenceBuilder; + } + @Override public PaginationIterator getCommits(Ref ref, boolean fetchAdditionalInfo) throws ReferenceNotFoundException { @@ -671,12 +801,19 @@ public String tokenForEntry(Commit entry) { } @Override - public List getIdentifiedKeys(Ref ref, Collection keys) + public List getIdentifiedKeys( + Ref ref, Collection keys, boolean returnNotFound) throws ReferenceNotFoundException { RefMapping refMapping = new RefMapping(persist); CommitObj head = refMapping.resolveRefHead(ref); if (head == null) { - return emptyList(); + if (!returnNotFound) { + return emptyList(); + } + StoreIndex index = StoreIndexes.emptyImmutableIndex(COMMIT_OP_SERIALIZER); + return keys.stream() + .map(key -> buildIdentifiedKey(key, index, null, null, x -> null)) + .collect(Collectors.toList()); } IndexesLogic indexesLogic = indexesLogic(persist); StoreIndex index = indexesLogic.buildCompleteIndex(head, Optional.empty()); @@ -687,7 +824,9 @@ public List getIdentifiedKeys(Ref ref, Collection indexElement = index.get(storeKey); if (indexElement == null) { - return null; + return returnNotFound + ? buildIdentifiedKey(key, index, null, null, x -> null) + : null; } CommitOp content = indexElement.content(); UUID contentId = content.contentId(); diff --git a/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestVersionStoreImpl.java b/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestVersionStoreImpl.java index 281fe5532a8..b06aa21f8eb 100644 --- a/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestVersionStoreImpl.java +++ b/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestVersionStoreImpl.java @@ -17,12 +17,14 @@ import static java.util.Collections.singletonList; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.tuple; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.projectnessie.model.CommitMeta.fromMessage; import static org.projectnessie.versioned.storage.common.config.StoreConfig.CONFIG_COMMIT_RETRIES; import static org.projectnessie.versioned.storage.common.config.StoreConfig.CONFIG_COMMIT_TIMEOUT_MILLIS; import jakarta.annotation.Nonnull; +import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -30,14 +32,17 @@ import java.util.stream.Stream; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.util.Lists; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.projectnessie.model.ContentKey; import org.projectnessie.model.IcebergTable; +import org.projectnessie.model.Operation; import org.projectnessie.model.Operation.Put; import org.projectnessie.versioned.BranchName; +import org.projectnessie.versioned.ContentHistoryEntry; import org.projectnessie.versioned.Hash; import org.projectnessie.versioned.ReferenceConflictException; import org.projectnessie.versioned.ReferenceNotFoundException; @@ -64,6 +69,102 @@ protected VersionStore store() { return ValidatingVersionStoreImpl.of(soft, persist); } + @SuppressWarnings("DataFlowIssue") + @Test + public void contentHistory() throws Exception { + var store = new VersionStoreImpl(persist); + + var branch = BranchName.of("branchContentHistory"); + store.create(branch, Optional.empty()).getHash(); + + var key = ContentKey.of("history-table"); + var keyOther = ContentKey.of("other-table"); + var keyRenamed = ContentKey.of("renamed-table"); + + var hashCreate = + store.commit( + branch, + Optional.empty(), + fromMessage("create"), + List.of(Operation.Put.of(key, IcebergTable.of("meta1", 1, 0, 0, 0)))); + var contentCreate = store.getValue(branch, key, false).content(); + store.commit( + branch, + Optional.empty(), + fromMessage("update 1"), + List.of( + Operation.Put.of(key, IcebergTable.of("meta2", 2, 0, 0, 0, contentCreate.getId())))); + var contentUpdate1 = store.getValue(branch, key, false).content(); + var hashCreateOther = + store.commit( + branch, + Optional.empty(), + fromMessage("create-other"), + List.of(Operation.Put.of(keyOther, IcebergTable.of("other1", 11, 0, 0, 0)))); + var hashUpdate2 = + store.commit( + branch, + Optional.empty(), + fromMessage("update 2"), + List.of( + Operation.Put.of( + key, IcebergTable.of("meta3", 3, 0, 0, 0, contentUpdate1.getId())))); + var contentUpdate2 = store.getValue(branch, key, false).content(); + var hashRename1 = + store.commit( + branch, + Optional.empty(), + fromMessage("rename 1"), + List.of( + Operation.Delete.of(key), + Operation.Put.of( + keyRenamed, IcebergTable.of("meta4", 4, 0, 0, 0, contentUpdate1.getId())))); + var contentRename1 = store.getValue(branch, keyRenamed, false).content(); + store.commit( + branch, + Optional.empty(), + fromMessage("update 3"), + List.of( + Operation.Put.of( + keyRenamed, IcebergTable.of("meta5", 5, 0, 0, 0, contentRename1.getId())))); + var contentUpdate3 = store.getValue(branch, keyRenamed, false).content(); + var hashRename2 = + store.commit( + branch, + Optional.empty(), + fromMessage("rename 2"), + List.of(Operation.Delete.of(keyRenamed), Operation.Put.of(key, contentUpdate3))); + var contentRename2 = store.getValue(branch, key, false).content(); + soft.assertThat(contentRename2).isEqualTo(contentUpdate3); + var hashUpdate4 = + store.commit( + branch, + Optional.empty(), + fromMessage("update 4"), + List.of( + Operation.Put.of( + key, IcebergTable.of("meta6", 6, 0, 0, 0, contentRename2.getId())))); + var contentUpdate4 = store.getValue(branch, key, false).content(); + + soft.assertThat(Lists.newArrayList(store.getContentChanges(branch, key))) + .extracting( + ContentHistoryEntry::getKey, + e -> e.getCommitMeta().getMessage(), + e -> Hash.of(e.getCommitMeta().getHash()), + ContentHistoryEntry::getContent) + .containsExactly( + tuple(key, "update 4", hashUpdate4.getCommitHash(), contentUpdate4), + // "rename 2" is seen as the 1st content change before "update 4" + tuple(key, "rename 2", hashRename2.getCommitHash(), contentRename2), + // "update 3" has the same content value as "rename 2", so it is not returned in the + // iterator + tuple(keyRenamed, "rename 1", hashRename1.getCommitHash(), contentRename1), + tuple(key, "update 2", hashUpdate2.getCommitHash(), contentUpdate2), + tuple(key, "create-other", hashCreateOther.getCommitHash(), contentUpdate1), + // "create" is the only commit that has the initial change + tuple(key, "create", hashCreate.getCommitHash(), contentCreate)); + } + @Test public void commitWithInfiniteConcurrentConflict( @NessieStoreConfig(name = CONFIG_COMMIT_RETRIES, value = "3") diff --git a/versioned/tests/src/main/java/org/projectnessie/versioned/tests/AbstractEntries.java b/versioned/tests/src/main/java/org/projectnessie/versioned/tests/AbstractEntries.java index 4a854a76dfe..440138fb0fc 100644 --- a/versioned/tests/src/main/java/org/projectnessie/versioned/tests/AbstractEntries.java +++ b/versioned/tests/src/main/java/org/projectnessie/versioned/tests/AbstractEntries.java @@ -225,7 +225,8 @@ void entries() throws Exception { tuple(content23a.identifiedKey(), content23a.content())); } - soft.assertThat(store.getIdentifiedKeys(commit, newArrayList(key2, key2a, key23, key23a))) + soft.assertThat( + store.getIdentifiedKeys(commit, newArrayList(key2, key2a, key23, key23a), false)) .containsExactly( content2.identifiedKey(), content2a.identifiedKey(), @@ -264,7 +265,33 @@ void entries() throws Exception { tuple(content23a.identifiedKey(), content23a.content())); } - soft.assertThat(store.getIdentifiedKeys(commit, newArrayList(key2a))) + soft.assertThat(store.getIdentifiedKeys(commit, newArrayList(key2a), false)) .containsOnly(content2a.identifiedKey()); + + soft.assertThat( + store.getIdentifiedKeys( + commit, newArrayList(key2a, ContentKey.of("not-there-1")), true)) + .containsExactly( + content2a.identifiedKey(), + IdentifiedContentKey.identifiedContentKeyFromContent( + ContentKey.of("not-there-1"), null, null, x -> null)); + + soft.assertThat( + store.getIdentifiedKeys( + commit, + newArrayList(ContentKey.of("not-there-1"), ContentKey.of("not-there-2")), + true)) + .containsExactly( + IdentifiedContentKey.identifiedContentKeyFromContent( + ContentKey.of("not-there-1"), null, null, x -> null), + IdentifiedContentKey.identifiedContentKeyFromContent( + ContentKey.of("not-there-2"), null, null, x -> null)); + + soft.assertThat( + store.getIdentifiedKeys( + commit, + newArrayList(ContentKey.of("not-there-1"), ContentKey.of("not-there-2")), + false)) + .isEmpty(); } }