Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Assign fresh IDs to view schema #10253

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion core/src/main/java/org/apache/iceberg/view/ViewMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
Expand All @@ -38,6 +39,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.immutables.value.Value;
import org.slf4j.Logger;
Expand Down Expand Up @@ -258,7 +260,22 @@ public Builder setCurrentVersionId(int newVersionId) {
}

public Builder setCurrentVersion(ViewVersion version, Schema schema) {
int newSchemaId = addSchemaInternal(schema);
Schema freshSchema;
if (history.isEmpty()) {
// view metadata is created
freshSchema =
TypeUtil.assignFreshIds(
INITIAL_SCHEMA_ID, schema, new AtomicInteger(0)::incrementAndGet);
} else {
// view metadata is replaced
Schema baseSchema = schemasById.get(versionsById.get(currentVersionId).schemaId());
AtomicInteger highestFieldId =
new AtomicInteger(
schemas.stream().map(Schema::highestFieldId).max(Integer::compareTo).orElse(0));
freshSchema = TypeUtil.assignFreshIds(schema, baseSchema, highestFieldId::incrementAndGet);
}

int newSchemaId = addSchemaInternal(freshSchema);
ViewVersion newVersion =
ImmutableViewVersion.builder().from(version).schemaId(newSchemaId).build();
return setCurrentVersionId(addVersionInternal(newVersion));
Expand Down
114 changes: 114 additions & 0 deletions core/src/test/java/org/apache/iceberg/view/TestViewMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -1181,4 +1181,118 @@ public void droppingDialectAllowedAndThenDisallowed() {
+ "Previous dialects: [trino]\n"
+ "New dialects: [spark]");
}

@Test
public void assignFreshSchemaIdWhereNewSchemasFieldIdIsSmallerThanHighestFieldId() {
Schema schemaOne = new Schema(Types.NestedField.required(1, "x", Types.LongType.get()));
Schema schemaTwo = new Schema(Types.NestedField.required(2, "y", Types.LongType.get()));
Schema schemaThree = new Schema(Types.NestedField.required(3, "z", Types.LongType.get()));
Schema newSchema = new Schema(Types.NestedField.required(1, "a", Types.LongType.get()));

ViewMetadata metadata =
ViewMetadata.builder()
.setLocation("custom-location")
.addSchema(schemaOne)
.addSchema(schemaTwo)
.addSchema(schemaThree)
.setCurrentVersion(
ImmutableViewVersion.builder()
.versionId(1)
.schemaId(0)
.timestampMillis(System.currentTimeMillis())
.defaultNamespace(Namespace.empty())
.build(),
schemaOne)
.build();

ViewMetadata replacement =
ViewMetadata.buildFrom(metadata)
.setCurrentVersion(
ImmutableViewVersion.builder()
.versionId(2)
.schemaId(0)
.timestampMillis(System.currentTimeMillis())
.defaultNamespace(Namespace.empty())
.build(),
newSchema)
.build();

assertThat(replacement.schema().highestFieldId()).isEqualTo(4);
}

@Test
public void assignFreshSchemaIdWhereNewSchemasFieldIdIsGreaterThanHighestFieldId() {
Schema schemaOne = new Schema(Types.NestedField.required(1, "x", Types.LongType.get()));
Schema schemaTwo = new Schema(Types.NestedField.required(2, "y", Types.LongType.get()));
Schema schemaThree = new Schema(Types.NestedField.required(3, "z", Types.LongType.get()));
Schema newSchema = new Schema(Types.NestedField.required(12, "a", Types.LongType.get()));

ViewMetadata metadata =
ViewMetadata.builder()
.setLocation("custom-location")
.addSchema(schemaOne)
.addSchema(schemaTwo)
.addSchema(schemaThree)
.setCurrentVersion(
ImmutableViewVersion.builder()
.versionId(1)
.schemaId(0)
.timestampMillis(System.currentTimeMillis())
.defaultNamespace(Namespace.empty())
.build(),
schemaOne)
.build();

ViewMetadata replacement =
ViewMetadata.buildFrom(metadata)
.setCurrentVersion(
ImmutableViewVersion.builder()
.versionId(2)
.schemaId(0)
.timestampMillis(System.currentTimeMillis())
.defaultNamespace(Namespace.empty())
.build(),
newSchema)
.build();

assertThat(replacement.schema().highestFieldId()).isEqualTo(4);
}

@Test
public void assignFreshSchemaIdWhereNewSchemasFieldIdSameAsHighestFieldId() {
Schema schemaOne = new Schema(Types.NestedField.required(1, "x", Types.LongType.get()));
Schema schemaTwo = new Schema(Types.NestedField.required(2, "y", Types.LongType.get()));
Schema schemaThree = new Schema(Types.NestedField.required(3, "z", Types.LongType.get()));
Schema newSchema = new Schema(Types.NestedField.required(3, "a", Types.LongType.get()));

ViewMetadata metadata =
ViewMetadata.builder()
.setLocation("custom-location")
.addSchema(schemaOne)
.addSchema(schemaTwo)
.addSchema(schemaThree)
.setCurrentVersion(
ImmutableViewVersion.builder()
.versionId(1)
.schemaId(0)
.timestampMillis(System.currentTimeMillis())
.defaultNamespace(Namespace.empty())
.build(),
schemaOne)
.build();

ViewMetadata replacement =
ViewMetadata.buildFrom(metadata)
.setCurrentVersion(
ImmutableViewVersion.builder()
.versionId(2)
.schemaId(0)
.timestampMillis(System.currentTimeMillis())
.defaultNamespace(Namespace.empty())
.build(),
newSchema)
.build();

assertThat(replacement.schema().highestFieldId()).isEqualTo(4);
}
}
71 changes: 51 additions & 20 deletions core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,27 @@ public abstract class ViewCatalogTests<C extends ViewCatalog & SupportsNamespace
required(3, "id", Types.IntegerType.get(), "unique ID"),
required(4, "data", Types.StringType.get()));

// actual schema for the view, with column IDs reassigned
protected static final Schema VIEW_SCHEMA =
new Schema(
0,
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get()));

private static final Schema OTHER_SCHEMA =
new Schema(7, required(1, "some_id", Types.IntegerType.get()));
new Schema(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changing this to show that the new column get's a re-assigned column ID

7,
required(3, "id", Types.IntegerType.get(), "unique ID"),
required(4, "zip", Types.LongType.get(), "zip"),
required(5, "data", Types.StringType.get()));

// actual replaced schema for the view, with column IDs reassigned
private static final Schema OTHER_VIEW_SCHEMA =
new Schema(
1,
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(3, "zip", Types.LongType.get(), "zip"),
required(2, "data", Types.StringType.get()));

protected abstract C catalog();

Expand Down Expand Up @@ -102,18 +121,18 @@ public void basicCreateView() {
.first()
.extracting(ViewHistoryEntry::versionId)
.isEqualTo(1);
assertThat(view.schema().schemaId()).isEqualTo(0);
assertThat(view.schema().asStruct()).isEqualTo(SCHEMA.asStruct());
assertThat(view.schema().schemaId()).isEqualTo(VIEW_SCHEMA.schemaId());
assertThat(view.schema().asStruct()).isEqualTo(VIEW_SCHEMA.asStruct());
assertThat(view.currentVersion().operation()).isEqualTo("create");
assertThat(view.schemas()).hasSize(1).containsKey(0);
assertThat(view.schemas()).hasSize(1).containsKey(VIEW_SCHEMA.schemaId());
assertThat(view.versions()).hasSize(1).containsExactly(view.currentVersion());

assertThat(view.currentVersion())
.isEqualTo(
ImmutableViewVersion.builder()
.timestampMillis(view.currentVersion().timestampMillis())
.versionId(1)
.schemaId(0)
.schemaId(VIEW_SCHEMA.schemaId())
.summary(view.currentVersion().summary())
.defaultNamespace(identifier.namespace())
.addRepresentations(
Expand Down Expand Up @@ -173,17 +192,17 @@ public void completeCreateView() {
.extracting(ViewHistoryEntry::versionId)
.isEqualTo(1);
assertThat(view.currentVersion().operation()).isEqualTo("create");
assertThat(view.schema().schemaId()).isEqualTo(0);
assertThat(view.schema().asStruct()).isEqualTo(SCHEMA.asStruct());
assertThat(view.schemas()).hasSize(1).containsKey(0);
assertThat(view.schema().schemaId()).isEqualTo(VIEW_SCHEMA.schemaId());
assertThat(view.schema().asStruct()).isEqualTo(VIEW_SCHEMA.asStruct());
assertThat(view.schemas()).hasSize(1).containsKey(VIEW_SCHEMA.schemaId());
assertThat(view.versions()).hasSize(1).containsExactly(view.currentVersion());

assertThat(view.currentVersion())
.isEqualTo(
ImmutableViewVersion.builder()
.timestampMillis(view.currentVersion().timestampMillis())
.versionId(1)
.schemaId(0)
.schemaId(VIEW_SCHEMA.schemaId())
.summary(view.currentVersion().summary())
.defaultNamespace(identifier.namespace())
.defaultCatalog(catalog().name())
Expand Down Expand Up @@ -885,17 +904,20 @@ public void createOrReplaceView(boolean useCreateOrReplace) {
.extracting(ViewHistoryEntry::versionId)
.isEqualTo(2);

assertThat(replacedView.schema().schemaId()).isEqualTo(1);
assertThat(replacedView.schema().asStruct()).isEqualTo(OTHER_SCHEMA.asStruct());
assertThat(replacedView.schemas()).hasSize(2).containsKey(0).containsKey(1);
assertThat(replacedView.schema().schemaId()).isEqualTo(OTHER_VIEW_SCHEMA.schemaId());
assertThat(replacedView.schema().asStruct()).isEqualTo(OTHER_VIEW_SCHEMA.asStruct());
assertThat(replacedView.schemas())
.hasSize(2)
.containsKey(VIEW_SCHEMA.schemaId())
.containsKey(OTHER_VIEW_SCHEMA.schemaId());

ViewVersion replacedViewVersion = replacedView.currentVersion();
assertThat(replacedView.versions())
.hasSize(2)
.containsExactly(viewVersion, replacedViewVersion);
assertThat(replacedViewVersion).isNotNull();
assertThat(replacedViewVersion.versionId()).isEqualTo(2);
assertThat(replacedViewVersion.schemaId()).isEqualTo(1);
assertThat(replacedViewVersion.schemaId()).isEqualTo(OTHER_VIEW_SCHEMA.schemaId());
assertThat(replacedViewVersion.operation()).isEqualTo("replace");
assertThat(replacedViewVersion.representations())
.containsExactly(
Expand Down Expand Up @@ -1120,7 +1142,12 @@ public void replaceViewVersion() {
.element(1)
.extracting(ViewHistoryEntry::versionId)
.isEqualTo(updatedView.currentVersion().versionId());
assertThat(updatedView.schemas()).hasSize(2).containsKey(0).containsKey(1);
assertThat(updatedView.schemas())
.hasSize(2)
.containsKey(VIEW_SCHEMA.schemaId())
.containsKey(OTHER_VIEW_SCHEMA.schemaId());
assertThat(updatedView.schema().schemaId()).isEqualTo(OTHER_VIEW_SCHEMA.schemaId());
assertThat(updatedView.schema().asStruct()).isEqualTo(OTHER_VIEW_SCHEMA.asStruct());
assertThat(updatedView.versions())
.hasSize(2)
.containsExactly(viewVersion, updatedView.currentVersion());
Expand All @@ -1130,7 +1157,7 @@ public void replaceViewVersion() {
assertThat(updatedViewVersion.versionId()).isEqualTo(viewVersion.versionId() + 1);
assertThat(updatedViewVersion.operation()).isEqualTo("replace");
assertThat(updatedViewVersion.representations()).hasSize(1).containsExactly(trino);
assertThat(updatedViewVersion.schemaId()).isEqualTo(1);
assertThat(updatedViewVersion.schemaId()).isEqualTo(OTHER_VIEW_SCHEMA.schemaId());
assertThat(updatedViewVersion.defaultCatalog()).isEqualTo("default");
assertThat(updatedViewVersion.defaultNamespace()).isEqualTo(identifier.namespace());

Expand Down Expand Up @@ -1585,6 +1612,8 @@ public void concurrentReplaceViewVersion() {
viewOps.commit(current, sparkUpdate);

View updatedView = catalog().loadView(identifier);
assertThat(updatedView.schema().schemaId()).isEqualTo(VIEW_SCHEMA.schemaId());
assertThat(updatedView.schema().asStruct()).isEqualTo(VIEW_SCHEMA.asStruct());
ViewVersion viewVersion = updatedView.currentVersion();
assertThat(viewVersion.versionId()).isEqualTo(3);
assertThat(updatedView.versions()).hasSize(3);
Expand All @@ -1593,7 +1622,7 @@ public void concurrentReplaceViewVersion() {
ImmutableViewVersion.builder()
.timestampMillis(updatedView.version(1).timestampMillis())
.versionId(1)
.schemaId(0)
.schemaId(VIEW_SCHEMA.schemaId())
.summary(updatedView.version(1).summary())
.defaultNamespace(identifier.namespace())
.addRepresentations(
Expand All @@ -1608,7 +1637,7 @@ public void concurrentReplaceViewVersion() {
ImmutableViewVersion.builder()
.timestampMillis(updatedView.version(2).timestampMillis())
.versionId(2)
.schemaId(1)
.schemaId(OTHER_VIEW_SCHEMA.schemaId())
.summary(updatedView.version(2).summary())
.defaultNamespace(identifier.namespace())
.addRepresentations(
Expand All @@ -1623,7 +1652,7 @@ public void concurrentReplaceViewVersion() {
ImmutableViewVersion.builder()
.timestampMillis(updatedView.version(3).timestampMillis())
.versionId(3)
.schemaId(0)
.schemaId(VIEW_SCHEMA.schemaId())
.summary(updatedView.version(3).summary())
.defaultNamespace(identifier.namespace())
.addRepresentations(
Expand All @@ -1638,6 +1667,8 @@ public void concurrentReplaceViewVersion() {
.hasMessageContaining("Cannot commit");

View updatedView = catalog().loadView(identifier);
assertThat(updatedView.schema().schemaId()).isEqualTo(OTHER_VIEW_SCHEMA.schemaId());
assertThat(updatedView.schema().asStruct()).isEqualTo(OTHER_VIEW_SCHEMA.asStruct());
ViewVersion viewVersion = updatedView.currentVersion();
assertThat(viewVersion.versionId()).isEqualTo(2);
assertThat(updatedView.versions()).hasSize(2);
Expand All @@ -1646,7 +1677,7 @@ public void concurrentReplaceViewVersion() {
ImmutableViewVersion.builder()
.timestampMillis(updatedView.version(1).timestampMillis())
.versionId(1)
.schemaId(0)
.schemaId(VIEW_SCHEMA.schemaId())
.summary(updatedView.version(1).summary())
.defaultNamespace(identifier.namespace())
.addRepresentations(
Expand All @@ -1661,7 +1692,7 @@ public void concurrentReplaceViewVersion() {
ImmutableViewVersion.builder()
.timestampMillis(updatedView.version(2).timestampMillis())
.versionId(2)
.schemaId(1)
.schemaId(OTHER_VIEW_SCHEMA.schemaId())
.summary(updatedView.version(2).summary())
.defaultNamespace(identifier.namespace())
.addRepresentations(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1761,8 +1761,9 @@ public void createOrReplaceViewKeepsViewHistory() {
assertThat(view.schema().asStruct())
.isEqualTo(
new Schema(
Types.NestedField.optional(0, "new_id", Types.IntegerType.get(), "some ID"),
Types.NestedField.optional(1, "new_data", Types.StringType.get(), "some data"))
0,
Types.NestedField.optional(1, "new_id", Types.IntegerType.get(), "some ID"),
Types.NestedField.optional(2, "new_data", Types.StringType.get(), "some data"))
.asStruct());

sql("CREATE OR REPLACE VIEW %s (updated_id COMMENT 'updated ID') AS %s", viewName, updatedSql);
Expand All @@ -1776,8 +1777,9 @@ public void createOrReplaceViewKeepsViewHistory() {
assertThat(view.schema().asStruct())
.isEqualTo(
new Schema(
1,
Types.NestedField.optional(
0, "updated_id", Types.IntegerType.get(), "updated ID"))
3, "updated_id", Types.IntegerType.get(), "updated ID"))
.asStruct());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1798,8 +1798,9 @@ public void createOrReplaceViewKeepsViewHistory() {
assertThat(view.schema().asStruct())
.isEqualTo(
new Schema(
Types.NestedField.optional(0, "new_id", Types.IntegerType.get(), "some ID"),
Types.NestedField.optional(1, "new_data", Types.StringType.get(), "some data"))
0,
Types.NestedField.optional(1, "new_id", Types.IntegerType.get(), "some ID"),
Types.NestedField.optional(2, "new_data", Types.StringType.get(), "some data"))
Comment on lines +1802 to +1803
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the first field ID changed to 1 because highestFieldId is set to 0 and then incremented when assigning the new IDs? This should be fine and aligns with whatever we do for tables. It also does not impact existing views because we're assigning the fresh IDs from a higher field ID

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes correct, that's exactly how it is done

.asStruct());

sql("CREATE OR REPLACE VIEW %s (updated_id COMMENT 'updated ID') AS %s", viewName, updatedSql);
Expand All @@ -1813,8 +1814,9 @@ public void createOrReplaceViewKeepsViewHistory() {
assertThat(view.schema().asStruct())
.isEqualTo(
new Schema(
1,
Types.NestedField.optional(
0, "updated_id", Types.IntegerType.get(), "updated ID"))
3, "updated_id", Types.IntegerType.get(), "updated ID"))
.asStruct());
}

Expand Down
Loading