Skip to content

Commit

Permalink
Core: Assign fresh IDs to view schema
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Apr 30, 2024
1 parent 6f0d9dd commit f378c67
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 26 deletions.
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -92,6 +93,7 @@
import org.apache.iceberg.rest.responses.LoadViewResponse;
import org.apache.iceberg.rest.responses.OAuthTokenResponse;
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.EnvironmentUtil;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
Expand Down Expand Up @@ -1185,6 +1187,8 @@ public View create() {
Preconditions.checkState(
null != defaultNamespace, "Cannot create view without specifying a default namespace");

schema = TypeUtil.assignFreshIds(schema, new AtomicInteger(0)::incrementAndGet);

ViewVersion viewVersion =
ImmutableViewVersion.builder()
.versionId(1)
Expand Down Expand Up @@ -1262,6 +1266,12 @@ private View replace(LoadViewResponse response) {
.max(Integer::compareTo)
.orElseGet(metadata::currentVersionId);

schema =
TypeUtil.assignFreshIds(
schema,
metadata.schema(),
new AtomicInteger(metadata.schema().highestFieldId())::incrementAndGet);

ViewVersion viewVersion =
ImmutableViewVersion.builder()
.versionId(maxVersionId + 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.EnvironmentContext;
import org.apache.iceberg.Schema;
Expand All @@ -33,6 +34,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.TypeUtil;

public abstract class BaseMetastoreViewCatalog extends BaseMetastoreCatalog implements ViewCatalog {
protected abstract ViewOperations newViewOps(TableIdentifier identifier);
Expand Down Expand Up @@ -155,6 +157,8 @@ private View create(ViewOperations ops) {
Preconditions.checkState(
null != defaultNamespace, "Cannot create view without specifying a default namespace");

schema = TypeUtil.assignFreshIds(schema, new AtomicInteger(0)::incrementAndGet);

ViewVersion viewVersion =
ImmutableViewVersion.builder()
.versionId(1)
Expand Down Expand Up @@ -204,6 +208,12 @@ private View replace(ViewOperations ops) {
.max(Integer::compareTo)
.orElseGet(metadata::currentVersionId);

schema =
TypeUtil.assignFreshIds(
schema,
metadata.schema(),
new AtomicInteger(metadata.schema().highestFieldId())::incrementAndGet);

ViewVersion viewVersion =
ImmutableViewVersion.builder()
.versionId(maxVersionId + 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.EnvironmentContext;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;

Expand Down Expand Up @@ -71,6 +73,12 @@ ViewMetadata internalApply() {
.max(Integer::compareTo)
.orElseGet(viewVersion::versionId);

schema =
TypeUtil.assignFreshIds(
schema,
base.schema(),
new AtomicInteger(base.schema().highestFieldId())::incrementAndGet);

ViewVersion newVersion =
ImmutableViewVersion.builder()
.versionId(maxVersionId + 1)
Expand Down
71 changes: 51 additions & 20 deletions core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,27 @@ public abstract class ViewCatalogTests<C extends ViewCatalog & SupportsNamespace
required(3, "id", Types.IntegerType.get(), "unique ID"),
required(4, "data", Types.StringType.get()));

// actual schema for the view, with column IDs reassigned
protected static final Schema VIEW_SCHEMA =
new Schema(
0,
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get()));

private static final Schema OTHER_SCHEMA =
new Schema(7, required(1, "some_id", Types.IntegerType.get()));
new Schema(
7,
required(3, "id", Types.IntegerType.get(), "unique ID"),
required(4, "zip", Types.LongType.get(), "zip"),
required(5, "data", Types.StringType.get()));

// actual replaced schema for the view, with column IDs reassigned
private static final Schema OTHER_VIEW_SCHEMA =
new Schema(
1,
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(3, "zip", Types.LongType.get(), "zip"),
required(2, "data", Types.StringType.get()));

protected abstract C catalog();

Expand Down Expand Up @@ -102,18 +121,18 @@ public void basicCreateView() {
.first()
.extracting(ViewHistoryEntry::versionId)
.isEqualTo(1);
assertThat(view.schema().schemaId()).isEqualTo(0);
assertThat(view.schema().asStruct()).isEqualTo(SCHEMA.asStruct());
assertThat(view.schema().schemaId()).isEqualTo(VIEW_SCHEMA.schemaId());
assertThat(view.schema().asStruct()).isEqualTo(VIEW_SCHEMA.asStruct());
assertThat(view.currentVersion().operation()).isEqualTo("create");
assertThat(view.schemas()).hasSize(1).containsKey(0);
assertThat(view.schemas()).hasSize(1).containsKey(VIEW_SCHEMA.schemaId());
assertThat(view.versions()).hasSize(1).containsExactly(view.currentVersion());

assertThat(view.currentVersion())
.isEqualTo(
ImmutableViewVersion.builder()
.timestampMillis(view.currentVersion().timestampMillis())
.versionId(1)
.schemaId(0)
.schemaId(VIEW_SCHEMA.schemaId())
.summary(view.currentVersion().summary())
.defaultNamespace(identifier.namespace())
.addRepresentations(
Expand Down Expand Up @@ -173,17 +192,17 @@ public void completeCreateView() {
.extracting(ViewHistoryEntry::versionId)
.isEqualTo(1);
assertThat(view.currentVersion().operation()).isEqualTo("create");
assertThat(view.schema().schemaId()).isEqualTo(0);
assertThat(view.schema().asStruct()).isEqualTo(SCHEMA.asStruct());
assertThat(view.schemas()).hasSize(1).containsKey(0);
assertThat(view.schema().schemaId()).isEqualTo(VIEW_SCHEMA.schemaId());
assertThat(view.schema().asStruct()).isEqualTo(VIEW_SCHEMA.asStruct());
assertThat(view.schemas()).hasSize(1).containsKey(VIEW_SCHEMA.schemaId());
assertThat(view.versions()).hasSize(1).containsExactly(view.currentVersion());

assertThat(view.currentVersion())
.isEqualTo(
ImmutableViewVersion.builder()
.timestampMillis(view.currentVersion().timestampMillis())
.versionId(1)
.schemaId(0)
.schemaId(VIEW_SCHEMA.schemaId())
.summary(view.currentVersion().summary())
.defaultNamespace(identifier.namespace())
.defaultCatalog(catalog().name())
Expand Down Expand Up @@ -885,17 +904,20 @@ public void createOrReplaceView(boolean useCreateOrReplace) {
.extracting(ViewHistoryEntry::versionId)
.isEqualTo(2);

assertThat(replacedView.schema().schemaId()).isEqualTo(1);
assertThat(replacedView.schema().asStruct()).isEqualTo(OTHER_SCHEMA.asStruct());
assertThat(replacedView.schemas()).hasSize(2).containsKey(0).containsKey(1);
assertThat(replacedView.schema().schemaId()).isEqualTo(OTHER_VIEW_SCHEMA.schemaId());
assertThat(replacedView.schema().asStruct()).isEqualTo(OTHER_VIEW_SCHEMA.asStruct());
assertThat(replacedView.schemas())
.hasSize(2)
.containsKey(VIEW_SCHEMA.schemaId())
.containsKey(OTHER_VIEW_SCHEMA.schemaId());

ViewVersion replacedViewVersion = replacedView.currentVersion();
assertThat(replacedView.versions())
.hasSize(2)
.containsExactly(viewVersion, replacedViewVersion);
assertThat(replacedViewVersion).isNotNull();
assertThat(replacedViewVersion.versionId()).isEqualTo(2);
assertThat(replacedViewVersion.schemaId()).isEqualTo(1);
assertThat(replacedViewVersion.schemaId()).isEqualTo(OTHER_VIEW_SCHEMA.schemaId());
assertThat(replacedViewVersion.operation()).isEqualTo("replace");
assertThat(replacedViewVersion.representations())
.containsExactly(
Expand Down Expand Up @@ -1120,7 +1142,12 @@ public void replaceViewVersion() {
.element(1)
.extracting(ViewHistoryEntry::versionId)
.isEqualTo(updatedView.currentVersion().versionId());
assertThat(updatedView.schemas()).hasSize(2).containsKey(0).containsKey(1);
assertThat(updatedView.schemas())
.hasSize(2)
.containsKey(VIEW_SCHEMA.schemaId())
.containsKey(OTHER_VIEW_SCHEMA.schemaId());
assertThat(updatedView.schema().schemaId()).isEqualTo(OTHER_VIEW_SCHEMA.schemaId());
assertThat(updatedView.schema().asStruct()).isEqualTo(OTHER_VIEW_SCHEMA.asStruct());
assertThat(updatedView.versions())
.hasSize(2)
.containsExactly(viewVersion, updatedView.currentVersion());
Expand All @@ -1130,7 +1157,7 @@ public void replaceViewVersion() {
assertThat(updatedViewVersion.versionId()).isEqualTo(viewVersion.versionId() + 1);
assertThat(updatedViewVersion.operation()).isEqualTo("replace");
assertThat(updatedViewVersion.representations()).hasSize(1).containsExactly(trino);
assertThat(updatedViewVersion.schemaId()).isEqualTo(1);
assertThat(updatedViewVersion.schemaId()).isEqualTo(OTHER_VIEW_SCHEMA.schemaId());
assertThat(updatedViewVersion.defaultCatalog()).isEqualTo("default");
assertThat(updatedViewVersion.defaultNamespace()).isEqualTo(identifier.namespace());

Expand Down Expand Up @@ -1585,6 +1612,8 @@ public void concurrentReplaceViewVersion() {
viewOps.commit(current, sparkUpdate);

View updatedView = catalog().loadView(identifier);
assertThat(updatedView.schema().schemaId()).isEqualTo(VIEW_SCHEMA.schemaId());
assertThat(updatedView.schema().asStruct()).isEqualTo(VIEW_SCHEMA.asStruct());
ViewVersion viewVersion = updatedView.currentVersion();
assertThat(viewVersion.versionId()).isEqualTo(3);
assertThat(updatedView.versions()).hasSize(3);
Expand All @@ -1593,7 +1622,7 @@ public void concurrentReplaceViewVersion() {
ImmutableViewVersion.builder()
.timestampMillis(updatedView.version(1).timestampMillis())
.versionId(1)
.schemaId(0)
.schemaId(VIEW_SCHEMA.schemaId())
.summary(updatedView.version(1).summary())
.defaultNamespace(identifier.namespace())
.addRepresentations(
Expand All @@ -1608,7 +1637,7 @@ public void concurrentReplaceViewVersion() {
ImmutableViewVersion.builder()
.timestampMillis(updatedView.version(2).timestampMillis())
.versionId(2)
.schemaId(1)
.schemaId(OTHER_VIEW_SCHEMA.schemaId())
.summary(updatedView.version(2).summary())
.defaultNamespace(identifier.namespace())
.addRepresentations(
Expand All @@ -1623,7 +1652,7 @@ public void concurrentReplaceViewVersion() {
ImmutableViewVersion.builder()
.timestampMillis(updatedView.version(3).timestampMillis())
.versionId(3)
.schemaId(0)
.schemaId(VIEW_SCHEMA.schemaId())
.summary(updatedView.version(3).summary())
.defaultNamespace(identifier.namespace())
.addRepresentations(
Expand All @@ -1638,6 +1667,8 @@ public void concurrentReplaceViewVersion() {
.hasMessageContaining("Cannot commit");

View updatedView = catalog().loadView(identifier);
assertThat(updatedView.schema().schemaId()).isEqualTo(OTHER_VIEW_SCHEMA.schemaId());
assertThat(updatedView.schema().asStruct()).isEqualTo(OTHER_VIEW_SCHEMA.asStruct());
ViewVersion viewVersion = updatedView.currentVersion();
assertThat(viewVersion.versionId()).isEqualTo(2);
assertThat(updatedView.versions()).hasSize(2);
Expand All @@ -1646,7 +1677,7 @@ public void concurrentReplaceViewVersion() {
ImmutableViewVersion.builder()
.timestampMillis(updatedView.version(1).timestampMillis())
.versionId(1)
.schemaId(0)
.schemaId(VIEW_SCHEMA.schemaId())
.summary(updatedView.version(1).summary())
.defaultNamespace(identifier.namespace())
.addRepresentations(
Expand All @@ -1661,7 +1692,7 @@ public void concurrentReplaceViewVersion() {
ImmutableViewVersion.builder()
.timestampMillis(updatedView.version(2).timestampMillis())
.versionId(2)
.schemaId(1)
.schemaId(OTHER_VIEW_SCHEMA.schemaId())
.summary(updatedView.version(2).summary())
.defaultNamespace(identifier.namespace())
.addRepresentations(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1751,8 +1751,9 @@ public void createOrReplaceViewKeepsViewHistory() {
assertThat(view.schema().asStruct())
.isEqualTo(
new Schema(
Types.NestedField.optional(0, "new_id", Types.IntegerType.get(), "some ID"),
Types.NestedField.optional(1, "new_data", Types.StringType.get(), "some data"))
0,
Types.NestedField.optional(1, "new_id", Types.IntegerType.get(), "some ID"),
Types.NestedField.optional(2, "new_data", Types.StringType.get(), "some data"))
.asStruct());

sql("CREATE OR REPLACE VIEW %s (updated_id COMMENT 'updated ID') AS %s", viewName, updatedSql);
Expand All @@ -1766,8 +1767,9 @@ public void createOrReplaceViewKeepsViewHistory() {
assertThat(view.schema().asStruct())
.isEqualTo(
new Schema(
1,
Types.NestedField.optional(
0, "updated_id", Types.IntegerType.get(), "updated ID"))
3, "updated_id", Types.IntegerType.get(), "updated ID"))
.asStruct());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1750,8 +1750,9 @@ public void createOrReplaceViewKeepsViewHistory() {
assertThat(view.schema().asStruct())
.isEqualTo(
new Schema(
Types.NestedField.optional(0, "new_id", Types.IntegerType.get(), "some ID"),
Types.NestedField.optional(1, "new_data", Types.StringType.get(), "some data"))
0,
Types.NestedField.optional(1, "new_id", Types.IntegerType.get(), "some ID"),
Types.NestedField.optional(2, "new_data", Types.StringType.get(), "some data"))
.asStruct());

sql("CREATE OR REPLACE VIEW %s (updated_id COMMENT 'updated ID') AS %s", viewName, updatedSql);
Expand All @@ -1765,8 +1766,9 @@ public void createOrReplaceViewKeepsViewHistory() {
assertThat(view.schema().asStruct())
.isEqualTo(
new Schema(
1,
Types.NestedField.optional(
0, "updated_id", Types.IntegerType.get(), "updated ID"))
3, "updated_id", Types.IntegerType.get(), "updated ID"))
.asStruct());
}

Expand Down

0 comments on commit f378c67

Please sign in to comment.