diff --git a/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java b/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java index dec10d3a4..ab3e29f9f 100644 --- a/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java +++ b/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java @@ -48,6 +48,9 @@ import javax.xml.xpath.XPathExpressionException; import javax.xml.xpath.XPathFactory; import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.catalog.pagination.EntityIdPageToken; +import org.apache.polaris.core.catalog.pagination.PageToken; +import org.apache.polaris.core.catalog.pagination.PolarisPage; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisChangeTrackingVersions; @@ -501,20 +504,23 @@ public List lookupEntityActiveBatch( /** {@inheritDoc} */ @Override - public @NotNull List listActiveEntities( + public @NotNull PolarisPage listActiveEntities( @NotNull PolarisCallContext callCtx, long catalogId, long parentId, - @NotNull PolarisEntityType entityType) { - return listActiveEntities(callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue()); + @NotNull PolarisEntityType entityType, + @NotNull PageToken pageToken) { + return listActiveEntities( + callCtx, catalogId, parentId, entityType, pageToken, Predicates.alwaysTrue()); } @Override - public @NotNull List listActiveEntities( + public @NotNull PolarisPage listActiveEntities( @NotNull PolarisCallContext callCtx, long catalogId, long parentId, @NotNull PolarisEntityType entityType, + @NotNull PageToken pageToken, @NotNull Predicate entityFilter) { // full range scan under the parent for that type return listActiveEntities( @@ -522,7 +528,7 @@ public List lookupEntityActiveBatch( catalogId, parentId, entityType, - Integer.MAX_VALUE, + pageToken, entityFilter, entity -> new PolarisEntityActiveRecord( @@ -535,23 +541,53 @@ public List lookupEntityActiveBatch( } @Override - public @NotNull List listActiveEntities( + public @NotNull PolarisPage listActiveEntities( @NotNull PolarisCallContext callCtx, long catalogId, long parentId, @NotNull PolarisEntityType entityType, - int limit, + @NotNull PageToken pageToken, @NotNull Predicate entityFilter, @NotNull Function transformer) { - // full range scan under the parent for that type - return this.store - .lookupFullEntitiesActive(localSession.get(), catalogId, parentId, entityType) - .stream() - .map(ModelEntity::toEntity) - .filter(entityFilter) - .limit(limit) - .map(transformer) - .collect(Collectors.toList()); + + List data; + if (entityFilter.equals(Predicates.alwaysTrue())) { + // In this case, we can push the filter down into the query + data = + this.store + .lookupFullEntitiesActive( + localSession.get(), catalogId, parentId, entityType, pageToken) + .stream() + .map(ModelEntity::toEntity) + .filter(entityFilter) + .map(transformer) + .collect(Collectors.toList()); + } else { + // In this case, we cannot push the filter down into the query. We must therefore remove + // the page size limit from the PageToken and filter on the client side. + // TODO Implement a generic predicate that can be pushed down into different metastores + PageToken unlimitedPageSizeToken = pageToken.withPageSize(Integer.MAX_VALUE); + List rawData = + this.store.lookupFullEntitiesActive( + localSession.get(), catalogId, parentId, entityType, unlimitedPageSizeToken); + if (pageToken.pageSize < Integer.MAX_VALUE && rawData.size() > pageToken.pageSize) { + LOGGER.info( + "A page token could not be respected due to a predicate. " + + "{} records were read but the client was asked to return {}.", + rawData.size(), + pageToken.pageSize); + } + + data = + rawData.stream() + .map(ModelEntity::toEntity) + .filter(entityFilter) + .limit(pageToken.pageSize) + .map(transformer) + .collect(Collectors.toList()); + } + + return pageToken.buildNextPage(data); } /** {@inheritDoc} */ @@ -761,4 +797,9 @@ public void rollback() { session.getTransaction().rollback(); } } + + @Override + public @NotNull PageToken.PageTokenBuilder pageTokenBuilder() { + return EntityIdPageToken.builder(); + } } diff --git a/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java b/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java index 02461fa4f..506dbb70d 100644 --- a/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java +++ b/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java @@ -24,6 +24,9 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.polaris.core.PolarisDiagnostics; +import org.apache.polaris.core.catalog.pagination.EntityIdPageToken; +import org.apache.polaris.core.catalog.pagination.PageToken; +import org.apache.polaris.core.catalog.pagination.ReadEverythingPageToken; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisEntitiesActiveKey; import org.apache.polaris.core.entity.PolarisEntityActiveRecord; @@ -274,19 +277,39 @@ long countActiveChildEntities( } List lookupFullEntitiesActive( - EntityManager session, long catalogId, long parentId, @NotNull PolarisEntityType entityType) { + EntityManager session, + long catalogId, + long parentId, + @NotNull PolarisEntityType entityType, + @NotNull PageToken pageToken) { diagnosticServices.check(session != null, "session_is_null"); + diagnosticServices.check( + (pageToken instanceof EntityIdPageToken || pageToken instanceof ReadEverythingPageToken), + "unexpected_page_token"); // Currently check against ENTITIES not joining with ENTITIES_ACTIVE String hql = - "SELECT m from ModelEntity m where m.catalogId=:catalogId and m.parentId=:parentId and m.typeCode=:typeCode"; + "SELECT m from ModelEntity m " + + "where m.catalogId=:catalogId and m.parentId=:parentId and m.typeCode=:typeCode and m.id > :tokenId"; + + if (pageToken instanceof EntityIdPageToken) { + hql += " order by m.id asc"; + } TypedQuery query = session .createQuery(hql, ModelEntity.class) .setParameter("catalogId", catalogId) .setParameter("parentId", parentId) - .setParameter("typeCode", entityType.getCode()); + .setParameter("typeCode", entityType.getCode()) + .setParameter("tokenId", -1L); + + if (pageToken instanceof EntityIdPageToken) { + query = + query + .setParameter("tokenId", ((EntityIdPageToken) pageToken).id) + .setMaxResults(pageToken.pageSize); + } return query.getResultList(); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/PolarisConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/PolarisConfiguration.java index ee101997d..a6abf559c 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/PolarisConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/PolarisConfiguration.java @@ -175,4 +175,12 @@ public static Builder builder() { "If set to true, allows tables to be dropped with the purge parameter set to true.") .defaultValue(true) .build(); + + public static final PolarisConfiguration LIST_PAGINATION_ENABLED = + PolarisConfiguration.builder() + .key("LIST_PAGINATION_ENABLED") + .catalogConfig("list-pagination.enabled") + .description("If set to true, pagination for APIs like listTables is enabled") + .defaultValue(false) + .build(); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/PolarisConfigurationStore.java b/polaris-core/src/main/java/org/apache/polaris/core/PolarisConfigurationStore.java index a249ed907..95c926886 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/PolarisConfigurationStore.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/PolarisConfigurationStore.java @@ -68,6 +68,8 @@ public interface PolarisConfigurationStore { if (config.defaultValue instanceof Boolean) { return config.cast(Boolean.valueOf(String.valueOf(value))); + } else if (config.defaultValue instanceof Integer) { + return config.cast(Integer.valueOf(value.toString())); } else { return config.cast(value); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/catalog/pagination/EntityIdPageToken.java b/polaris-core/src/main/java/org/apache/polaris/core/catalog/pagination/EntityIdPageToken.java new file mode 100644 index 000000000..4eef4a8d2 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/catalog/pagination/EntityIdPageToken.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.catalog.pagination; + +import java.util.List; +import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.persistence.models.ModelEntity; + +/** + * A {@link PageToken} implementation that tracks the greatest ID from either {@link + * PolarisBaseEntity} or {@link ModelEntity} objects supplied in updates. Entities are meant to be + * filtered during listing such that only entities with and ID greater than the ID of the token are + * returned. + */ +public class EntityIdPageToken extends PageToken { + public long id; + + private EntityIdPageToken(long id, int pageSize) { + this.id = id; + this.pageSize = pageSize; + validate(); + } + + /** The minimum ID that could be attached to an entity */ + private static final long MINIMUM_ID = 0; + + /** The entity ID to use to start with. */ + private static final long BASE_ID = MINIMUM_ID - 1; + + @Override + protected List getComponents() { + return List.of(String.valueOf(id), String.valueOf(pageSize)); + } + + /** Get a new `EntityIdPageTokenBuilder` instance */ + public static PageTokenBuilder builder() { + return new EntityIdPageToken.EntityIdPageTokenBuilder(); + } + + @Override + protected PageTokenBuilder getBuilder() { + return EntityIdPageToken.builder(); + } + + /** A {@link PageTokenBuilder} implementation for {@link EntityIdPageToken} */ + public static class EntityIdPageTokenBuilder extends PageTokenBuilder { + + @Override + public String tokenPrefix() { + return "polaris-entity-id"; + } + + @Override + public int expectedComponents() { + // id, pageSize + return 2; + } + + @Override + protected EntityIdPageToken fromStringComponents(List components) { + return new EntityIdPageToken( + Integer.parseInt(components.get(0)), Integer.parseInt(components.get(1))); + } + + @Override + protected EntityIdPageToken fromLimitImpl(int limit) { + return new EntityIdPageToken(BASE_ID, limit); + } + } + + @Override + public PageToken updated(List newData) { + if (newData == null || newData.size() < this.pageSize) { + return PageToken.DONE; + } else { + var head = newData.get(0); + if (head instanceof ModelEntity) { + return new EntityIdPageToken( + ((ModelEntity) newData.get(newData.size() - 1)).getId(), this.pageSize); + } else if (head instanceof PolarisBaseEntity) { + return new EntityIdPageToken( + ((PolarisBaseEntity) newData.get(newData.size() - 1)).getId(), this.pageSize); + } else { + throw new IllegalArgumentException( + "Cannot build a page token from: " + newData.get(0).getClass().getSimpleName()); + } + } + } + + @Override + public PageToken withPageSize(Integer pageSize) { + if (pageSize == null) { + return new EntityIdPageToken(BASE_ID, this.pageSize); + } else { + return new EntityIdPageToken(this.id, pageSize); + } + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/catalog/pagination/OffsetPageToken.java b/polaris-core/src/main/java/org/apache/polaris/core/catalog/pagination/OffsetPageToken.java new file mode 100644 index 000000000..151ceb900 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/catalog/pagination/OffsetPageToken.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.catalog.pagination; + +import java.util.List; + +/** + * A simple {@link PageToken} implementation that tracks the number of records returned. Entities + * are meant to be filtered during listing such that when a token with offset N is supplied, the + * first N records are omitted from the results. + */ +public class OffsetPageToken extends PageToken { + + /** + * The offset of the token. If this is `5` for example, the first 5 entities returned by a list + * operation that uses this token will be skipped. + */ + public final int offset; + + /** The offset to use to start with. */ + private static final int BASE_OFFSET = 0; + + private OffsetPageToken(int offset, int pageSize) { + this.offset = offset; + this.pageSize = pageSize; + validate(); + } + + @Override + protected void validate() { + if (offset < 0) { + throw new IllegalArgumentException("Offset must be greater than zero"); + } + super.validate(); + } + + /** Get a new `EntityIdPageTokenBuilder` instance */ + public static PageTokenBuilder builder() { + return new OffsetPageTokenBuilder(); + } + + @Override + protected PageTokenBuilder getBuilder() { + return OffsetPageToken.builder(); + } + + @Override + protected List getComponents() { + return List.of(String.valueOf(this.offset), String.valueOf(this.pageSize)); + } + + /** A {@link PageTokenBuilder} implementation for {@link OffsetPageToken} */ + public static class OffsetPageTokenBuilder extends PageTokenBuilder { + + private OffsetPageTokenBuilder() {} + + @Override + public String tokenPrefix() { + return "polaris-offset"; + } + + @Override + public int expectedComponents() { + // offset + limit + return 2; + } + + @Override + protected OffsetPageToken fromStringComponents(List components) { + return new OffsetPageToken( + Integer.parseInt(components.get(0)), Integer.parseInt(components.get(1))); + } + + @Override + protected OffsetPageToken fromLimitImpl(int limit) { + return new OffsetPageToken(BASE_OFFSET, limit); + } + } + + @Override + public PageToken updated(List newData) { + if (newData == null || newData.size() < this.pageSize) { + return PageToken.DONE; + } else { + return new OffsetPageToken(this.offset + newData.size(), pageSize); + } + } + + @Override + public OffsetPageToken withPageSize(Integer pageSize) { + if (pageSize == null) { + return new OffsetPageToken(BASE_OFFSET, this.pageSize); + } else { + return new OffsetPageToken(this.offset, pageSize); + } + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/catalog/pagination/PageToken.java b/polaris-core/src/main/java/org/apache/polaris/core/catalog/pagination/PageToken.java new file mode 100644 index 000000000..09a2a51ec --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/catalog/pagination/PageToken.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.catalog.pagination; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Base64; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Represents a page token that can be used by operations like `listTables`. Clients that specify a + * `pageSize` (or a `pageToken`) may receive a `next-page-token` in the response, the content of + * which is a serialized PageToken. + * + *

By providing that in the next query's `pageToken`, the client can resume listing where they + * left off. If the client provides a `pageToken` or `pageSize` but `next-page-token` is null in the + * response, that means there is no more data to read. + */ +public abstract class PageToken { + + public int pageSize; + + public static final PageToken DONE = null; + public static final int DEFAULT_PAGE_SIZE = 1000; + + protected void validate() { + if (pageSize <= 0) { + throw new IllegalArgumentException("Page size must be greater than zero"); + } + } + + /** + * Get a new PageTokenBuilder from a PageToken. The PageTokenBuilder type should match the + * PageToken type. Implementations may also provide a static `builder` method to obtain the same + * PageTokenBuilder. + */ + protected abstract PageTokenBuilder getBuilder(); + + /** Allows `PageToken` implementations to implement methods like `fromLimit` */ + public abstract static class PageTokenBuilder { + + /** + * A prefix that tokens are expected to start with, ideally unique across `PageTokenBuilder` + * implementations. + */ + public abstract String tokenPrefix(); + + /** + * The number of expected components in a token. This should match the number of components + * returned by getComponents and shouldn't account for the prefix or the checksum. + */ + public abstract int expectedComponents(); + + /** Deserialize a string into a {@link PageToken} */ + public final PageToken fromString(String tokenString) { + if (tokenString == null) { + throw new IllegalArgumentException("Cannot build page token from null string"); + } else if (tokenString.isEmpty()) { + if (this instanceof ReadEverythingPageToken.ReadEverythingPageTokenBuilder) { + return ReadEverythingPageToken.get(); + } else { + return fromLimit(DEFAULT_PAGE_SIZE); + } + } else { + try { + String decoded = + new String(Base64.getDecoder().decode(tokenString), StandardCharsets.UTF_8); + String[] parts = decoded.split(":"); + + // +2 to account for the prefix and checksum. + if (parts.length != expectedComponents() + 2 || !parts[0].equals(tokenPrefix())) { + throw new IllegalArgumentException("Invalid token format in token: " + tokenString); + } + + // Cut off prefix and checksum + T result = fromStringComponents(Arrays.asList(parts).subList(1, parts.length - 1)); + result.validate(); + return result; + } catch (Exception e) { + throw new IllegalArgumentException("Failed to decode page token: " + tokenString, e); + } + } + } + + /** Construct a {@link PageToken} from a plain limit */ + public final PageToken fromLimit(Integer limit) { + if (limit == null) { + return ReadEverythingPageToken.get(); + } else { + return fromLimitImpl(limit); + } + } + + /** Construct a {@link PageToken} from a plain limit */ + protected abstract T fromLimitImpl(int limit); + + /** + * {@link PageTokenBuilder} implementations should implement this to build a {@link PageToken} + * from components in a string token. These components should be the same ones returned by + * {@link #getComponents()} and won't include the token prefix or the checksum. + */ + protected abstract T fromStringComponents(List components); + } + + /** Convert this into components that the serialized token string will be built from. */ + protected abstract List getComponents(); + + /** + * Builds a new page token to reflect new data that's been read. If the amount of data read is + * less than the pageSize, this will return {@link PageToken#DONE}(null) + */ + protected abstract PageToken updated(List newData); + + /** + * Builds a {@link PolarisPage} from a {@link List}. The {@link PageToken} attached to the + * new {@link PolarisPage} is the same as the result of calling {@link #updated(List)} on this + * {@link PageToken}. + */ + public final PolarisPage buildNextPage(List data) { + return new PolarisPage(updated(data), data); + } + + /** + * Return a new {@link PageToken} with an updated page size. If the pageSize provided is null, the + * existing page size will be preserved. + */ + public abstract PageToken withPageSize(Integer pageSize); + + /** Serialize a {@link PageToken} into a string */ + @Override + public final String toString() { + List components = getComponents(); + String prefix = getBuilder().tokenPrefix(); + String componentString = String.join(":", components); + String checksum = String.valueOf(componentString.hashCode()); + List allElements = + Stream.of(prefix, componentString, checksum) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + String rawString = String.join(":", allElements); + return Base64.getEncoder().encodeToString(rawString.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public final boolean equals(Object o) { + if (o instanceof PageToken) { + return this.toString().equals(o.toString()); + } else { + return false; + } + } + + @Override + public final int hashCode() { + if (toString() == null) { + return 0; + } else { + return toString().hashCode(); + } + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/catalog/pagination/PolarisPage.java b/polaris-core/src/main/java/org/apache/polaris/core/catalog/pagination/PolarisPage.java new file mode 100644 index 000000000..f0f673934 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/catalog/pagination/PolarisPage.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.catalog.pagination; + +import java.util.List; + +/** + * A wrapper for a {@link List} of data and a {@link PageToken} that can be used to continue the + * listing operation that generated that data. + */ +public class PolarisPage { + public final PageToken pageToken; + public final List data; + + public PolarisPage(PageToken pageToken, List data) { + this.pageToken = pageToken; + this.data = data; + } + + /** + * Used to wrap a {@link List} of data into a {@link PolarisPage} when there is no more data + */ + public static PolarisPage fromData(List data) { + return new PolarisPage<>(PageToken.DONE, data); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/catalog/pagination/ReadEverythingPageToken.java b/polaris-core/src/main/java/org/apache/polaris/core/catalog/pagination/ReadEverythingPageToken.java new file mode 100644 index 000000000..5002a84c4 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/catalog/pagination/ReadEverythingPageToken.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.catalog.pagination; + +import java.util.List; + +/** + * A {@link PageToken} implementation for readers who want to read everything. The behavior when + * using this token should be the same as when reading without a token. + */ +public class ReadEverythingPageToken extends PageToken { + + private ReadEverythingPageToken() { + this.pageSize = Integer.MAX_VALUE; + validate(); + } + + /** Get a {@link ReadEverythingPageToken} */ + public static ReadEverythingPageToken get() { + return new ReadEverythingPageToken(); + } + + public static PageTokenBuilder builder() { + return new ReadEverythingPageTokenBuilder(); + } + + @Override + protected PageTokenBuilder getBuilder() { + return builder(); + } + + /** A {@link PageTokenBuilder} implementation for {@link ReadEverythingPageToken} */ + public static class ReadEverythingPageTokenBuilder + extends PageTokenBuilder { + + private ReadEverythingPageTokenBuilder() {} + + @Override + public String tokenPrefix() { + return "polaris-read-everything"; + } + + @Override + public int expectedComponents() { + return 0; + } + + @Override + protected ReadEverythingPageToken fromStringComponents(List components) { + return ReadEverythingPageToken.get(); + } + + @Override + protected ReadEverythingPageToken fromLimitImpl(int limit) { + throw new UnsupportedOperationException(); + } + } + + @Override + protected List getComponents() { + return List.of(); + } + + /** Any time {@link ReadEverythingPageToken} is updated, everything has been read */ + @Override + public PageToken updated(List newData) { + return PageToken.DONE; + } + + /** {@link ReadEverythingPageToken} does not support page size */ + @Override + public PageToken withPageSize(Integer pageSize) { + if (pageSize == null) { + return ReadEverythingPageToken.get(); + } else { + throw new UnsupportedOperationException(); + } + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java index 9033795d8..705253583 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java @@ -24,9 +24,12 @@ import java.util.EnumMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.catalog.pagination.PageToken; +import org.apache.polaris.core.catalog.pagination.PolarisPage; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisChangeTrackingVersions; import org.apache.polaris.core.entity.PolarisEntity; @@ -309,6 +312,13 @@ class ListEntitiesResult extends BaseResult { // null if not success. Else the list of entities being returned private final List entities; + private final Optional pageTokenOpt; + + /** Create a {@link ListEntitiesResult} from a {@link PolarisPage} */ + public static ListEntitiesResult fromPolarisPage( + PolarisPage polarisPage) { + return new ListEntitiesResult(polarisPage.data, Optional.ofNullable(polarisPage.pageToken)); + } /** * Constructor for an error @@ -318,9 +328,11 @@ class ListEntitiesResult extends BaseResult { */ public ListEntitiesResult( @NotNull PolarisMetaStoreManager.ReturnStatus errorCode, - @Nullable String extraInformation) { + @Nullable String extraInformation, + @NotNull Optional pageTokenOpt) { super(errorCode, extraInformation); this.entities = null; + this.pageTokenOpt = pageTokenOpt; } /** @@ -328,23 +340,32 @@ public ListEntitiesResult( * * @param entities list of entities being returned, implies success */ - public ListEntitiesResult(@NotNull List entities) { + public ListEntitiesResult( + @NotNull List entities, + @Nullable Optional pageTokenOpt) { super(ReturnStatus.SUCCESS); this.entities = entities; + this.pageTokenOpt = pageTokenOpt; } @JsonCreator private ListEntitiesResult( @JsonProperty("returnStatus") @NotNull ReturnStatus returnStatus, @JsonProperty("extraInformation") String extraInformation, - @JsonProperty("entities") List entities) { + @JsonProperty("entities") List entities, + @JsonProperty("pageToken") Optional pageTokenOpt) { super(returnStatus, extraInformation); this.entities = entities; + this.pageTokenOpt = pageTokenOpt; } public List getEntities() { return entities; } + + public Optional getPageToken() { + return pageTokenOpt; + } } /** @@ -364,7 +385,8 @@ ListEntitiesResult listEntities( @NotNull PolarisCallContext callCtx, @Nullable List catalogPath, @NotNull PolarisEntityType entityType, - @NotNull PolarisEntitySubType entitySubType); + @NotNull PolarisEntitySubType entitySubType, + @NotNull PageToken pageToken); /** the return for a generate new entity id */ class GenerateEntityIdResult extends BaseResult { @@ -663,6 +685,11 @@ class EntitiesResult extends BaseResult { // null if not success. Else the list of entities being returned private final List entities; + private final Optional pageTokenOpt; + + public static EntitiesResult fromPolarisPage(PolarisPage polarisPage) { + return new EntitiesResult(polarisPage.data, Optional.ofNullable(polarisPage.pageToken)); + } /** * Constructor for an error @@ -675,6 +702,7 @@ public EntitiesResult( @Nullable String extraInformation) { super(errorStatus, extraInformation); this.entities = null; + this.pageTokenOpt = Optional.empty(); } /** @@ -682,23 +710,31 @@ public EntitiesResult( * * @param entities list of entities being returned, implies success */ - public EntitiesResult(@NotNull List entities) { + public EntitiesResult( + @NotNull List entities, @NotNull Optional pageTokenOpt) { super(ReturnStatus.SUCCESS); this.entities = entities; + this.pageTokenOpt = pageTokenOpt; } @JsonCreator private EntitiesResult( @JsonProperty("returnStatus") @NotNull ReturnStatus returnStatus, @JsonProperty("extraInformation") String extraInformation, - @JsonProperty("entities") List entities) { + @JsonProperty("entities") List entities, + @JsonProperty("pageToken") Optional pageTokenOpt) { super(returnStatus, extraInformation); this.entities = entities; + this.pageTokenOpt = pageTokenOpt; } public List getEntities() { return entities; } + + public Optional getPageToken() { + return pageTokenOpt; + } } /** @@ -1199,7 +1235,8 @@ ChangeTrackingResult loadEntitiesChangeTracking( * @return list of tasks to be completed */ @NotNull - EntitiesResult loadTasks(@NotNull PolarisCallContext callCtx, String executorId, int limit); + EntitiesResult loadTasks( + @NotNull PolarisCallContext callCtx, String executorId, PageToken pageToken); /** Result of a getSubscopedCredsForEntity() call */ class ScopedCredentialsResult extends BaseResult { diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManagerImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManagerImpl.java index b05129fa3..ed7e64461 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManagerImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManagerImpl.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Predicates; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; import java.util.EnumMap; @@ -30,10 +31,13 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.catalog.pagination.PageToken; +import org.apache.polaris.core.catalog.pagination.PolarisPage; import org.apache.polaris.core.entity.AsyncTaskType; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisChangeTrackingVersions; @@ -778,37 +782,41 @@ private void bootstrapPolarisService( } /** - * See {@link #listEntities(PolarisCallContext, List, PolarisEntityType, PolarisEntitySubType)} + * See {@link #listEntities(PolarisCallContext, List, PolarisEntityType, PolarisEntitySubType, + * PageToken)} */ private @NotNull ListEntitiesResult listEntities( @NotNull PolarisCallContext callCtx, @NotNull PolarisMetaStoreSession ms, @Nullable List catalogPath, @NotNull PolarisEntityType entityType, - @NotNull PolarisEntitySubType entitySubType) { + @NotNull PolarisEntitySubType entitySubType, + @NotNull PageToken pageToken) { // first resolve again the catalogPath to that entity PolarisEntityResolver resolver = new PolarisEntityResolver(callCtx, ms, catalogPath); // return if we failed to resolve if (resolver.isFailure()) { - return new ListEntitiesResult(ReturnStatus.CATALOG_PATH_CANNOT_BE_RESOLVED, null); + return new ListEntitiesResult( + ReturnStatus.CATALOG_PATH_CANNOT_BE_RESOLVED, null, Optional.empty()); } // return list of active entities - List toreturnList = + PolarisPage resultPage = ms.listActiveEntities( - callCtx, resolver.getCatalogIdOrNull(), resolver.getParentId(), entityType); + callCtx, resolver.getCatalogIdOrNull(), resolver.getParentId(), entityType, pageToken); // prune the returned list with only entities matching the entity subtype if (entitySubType != PolarisEntitySubType.ANY_SUBTYPE) { - toreturnList = - toreturnList.stream() - .filter(rec -> rec.getSubTypeCode() == entitySubType.getCode()) - .collect(Collectors.toList()); + resultPage = + pageToken.buildNextPage( + resultPage.data.stream() + .filter(rec -> rec.getSubTypeCode() == entitySubType.getCode()) + .collect(Collectors.toList())); } // done - return new ListEntitiesResult(toreturnList); + return ListEntitiesResult.fromPolarisPage(resultPage); } /** {@inheritDoc} */ @@ -817,13 +825,15 @@ private void bootstrapPolarisService( @NotNull PolarisCallContext callCtx, @Nullable List catalogPath, @NotNull PolarisEntityType entityType, - @NotNull PolarisEntitySubType entitySubType) { + @NotNull PolarisEntitySubType entitySubType, + @NotNull PageToken pageToken) { // get meta store we should be using PolarisMetaStoreSession ms = callCtx.getMetaStore(); // run operation in a read transaction return ms.runInReadTransaction( - callCtx, () -> listEntities(callCtx, ms, catalogPath, entityType, entitySubType)); + callCtx, + () -> listEntities(callCtx, ms, catalogPath, entityType, entitySubType, pageToken)); } /** {@inheritDoc} */ @@ -1197,7 +1207,7 @@ public Map deserializeProperties(PolarisCallContext callCtx, Str } createdEntities.add(entityCreateResult.getEntity()); } - return new EntitiesResult(createdEntities); + return new EntitiesResult(createdEntities, Optional.empty()); }); } @@ -1286,7 +1296,7 @@ public Map deserializeProperties(PolarisCallContext callCtx, Str } // good, all success - return new EntitiesResult(updatedEntities); + return new EntitiesResult(updatedEntities, Optional.empty()); } /** {@inheritDoc} */ @@ -1477,13 +1487,14 @@ public Map deserializeProperties(PolarisCallContext callCtx, Str // get the list of catalog roles, at most 2 List catalogRoles = ms.listActiveEntities( - callCtx, - catalogId, - catalogId, - PolarisEntityType.CATALOG_ROLE, - 2, - entity -> true, - Function.identity()); + callCtx, + catalogId, + catalogId, + PolarisEntityType.CATALOG_ROLE, + ms.pageTokenBuilder().fromLimit(2), + Predicates.alwaysTrue(), + Function.identity()) + .data; // if we have 2, we cannot drop the catalog. If only one left, better be the admin role if (catalogRoles.size() > 1) { @@ -2008,38 +2019,39 @@ private PolarisEntityResolver resolveSecurableToRoleGrant( callCtx, () -> this.loadEntity(callCtx, ms, entityCatalogId, entityId)); } - /** Refer to {@link #loadTasks(PolarisCallContext, String, int)} */ + /** Refer to {@link #loadTasks(PolarisCallContext, String, PageToken)} */ private @NotNull EntitiesResult loadTasks( @NotNull PolarisCallContext callCtx, @NotNull PolarisMetaStoreSession ms, String executorId, - int limit) { + PageToken pageToken) { + + long taskAgeTimeout = + callCtx + .getConfigurationStore() + .getConfiguration( + callCtx, + PolarisTaskConstants.TASK_TIMEOUT_MILLIS_CONFIG, + PolarisTaskConstants.TASK_TIMEOUT_MILLIS); // find all available tasks - List availableTasks = + PolarisPage availableTasks = ms.listActiveEntities( callCtx, PolarisEntityConstants.getRootEntityId(), PolarisEntityConstants.getRootEntityId(), PolarisEntityType.TASK, - limit, + pageToken, entity -> { PolarisObjectMapperUtil.TaskExecutionState taskState = PolarisObjectMapperUtil.parseTaskState(entity); - long taskAgeTimeout = - callCtx - .getConfigurationStore() - .getConfiguration( - callCtx, - PolarisTaskConstants.TASK_TIMEOUT_MILLIS_CONFIG, - PolarisTaskConstants.TASK_TIMEOUT_MILLIS); return taskState == null || taskState.executor == null || callCtx.getClock().millis() - taskState.lastAttemptStartTime > taskAgeTimeout; }, Function.identity()); - availableTasks.forEach( + availableTasks.data.forEach( task -> { Map properties = PolarisObjectMapperUtil.deserializeProperties(callCtx, task.getProperties()); @@ -2056,14 +2068,14 @@ private PolarisEntityResolver resolveSecurableToRoleGrant( task.setProperties(PolarisObjectMapperUtil.serializeProperties(callCtx, properties)); writeEntity(callCtx, ms, task, false); }); - return new EntitiesResult(availableTasks); + return EntitiesResult.fromPolarisPage(availableTasks); } @Override public @NotNull EntitiesResult loadTasks( - @NotNull PolarisCallContext callCtx, String executorId, int limit) { + @NotNull PolarisCallContext callCtx, String executorId, PageToken pageToken) { PolarisMetaStoreSession ms = callCtx.getMetaStore(); - return ms.runInTransaction(callCtx, () -> this.loadTasks(callCtx, ms, executorId, limit)); + return ms.runInTransaction(callCtx, () -> this.loadTasks(callCtx, ms, executorId, pageToken)); } /** {@inheritDoc} */ diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreSession.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreSession.java index ff5bcbf3e..c2645dd69 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreSession.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreSession.java @@ -23,6 +23,8 @@ import java.util.function.Predicate; import java.util.function.Supplier; import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.catalog.pagination.PageToken; +import org.apache.polaris.core.catalog.pagination.PolarisPage; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisChangeTrackingVersions; import org.apache.polaris.core.entity.PolarisEntitiesActiveKey; @@ -306,11 +308,12 @@ List lookupEntityActiveBatch( * @return the list of entities_active records for the specified list operation */ @NotNull - List listActiveEntities( + PolarisPage listActiveEntities( @NotNull PolarisCallContext callCtx, long catalogId, long parentId, - @NotNull PolarisEntityType entityType); + @NotNull PolarisEntityType entityType, + @NotNull PageToken pageToken); /** * List active entities where some predicate returns true @@ -324,11 +327,12 @@ List listActiveEntities( * @return the list of entities for which the predicate returns true */ @NotNull - List listActiveEntities( + PolarisPage listActiveEntities( @NotNull PolarisCallContext callCtx, long catalogId, long parentId, @NotNull PolarisEntityType entityType, + @NotNull PageToken pageToken, @NotNull Predicate entityFilter); /** @@ -339,7 +343,7 @@ List listActiveEntities( * @param catalogId catalog id for that entity, NULL_ID if the entity is top-level * @param parentId id of the parent, can be the special 0 value representing the root entity * @param entityType type of entities to list - * @param limit the max number of items to return + * @param pageToken the pagination token to use * @param entityFilter the filter to be applied to each entity. Only entities where the predicate * returns true are returned in the list * @param transformer the transformation function applied to the {@link PolarisBaseEntity} before @@ -347,12 +351,12 @@ List listActiveEntities( * @return the list of entities for which the predicate returns true */ @NotNull - List listActiveEntities( + PolarisPage listActiveEntities( @NotNull PolarisCallContext callCtx, long catalogId, long parentId, @NotNull PolarisEntityType entityType, - int limit, + @NotNull PageToken pageToken, @NotNull Predicate entityFilter, @NotNull Function transformer); @@ -524,4 +528,14 @@ boolean hasChildren( /** Rollback the current transaction */ void rollback(); + + /** + * This method is used to construct page tokens when the metastore may need them. Different + * metastore implementations may bring their own PageToken implementations or share them. + * + * @return A {@link PageToken.PageTokenBuilder} implementation compatible with this + * `PolarisMetaStoreSession` implementation + */ + @NotNull + PageToken.PageTokenBuilder pageTokenBuilder(); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisTreeMapMetaStoreSessionImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisTreeMapMetaStoreSessionImpl.java index 096bce49e..328a4777a 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisTreeMapMetaStoreSessionImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisTreeMapMetaStoreSessionImpl.java @@ -19,12 +19,18 @@ package org.apache.polaris.core.persistence; import com.google.common.base.Predicates; +import java.util.Comparator; import java.util.List; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.catalog.pagination.OffsetPageToken; +import org.apache.polaris.core.catalog.pagination.PageToken; +import org.apache.polaris.core.catalog.pagination.PolarisPage; +import org.apache.polaris.core.catalog.pagination.ReadEverythingPageToken; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisChangeTrackingVersions; import org.apache.polaris.core.entity.PolarisEntitiesActiveKey; @@ -319,20 +325,23 @@ public List lookupEntityActiveBatch( /** {@inheritDoc} */ @Override - public @NotNull List listActiveEntities( + public @NotNull PolarisPage listActiveEntities( @NotNull PolarisCallContext callCtx, long catalogId, long parentId, - @NotNull PolarisEntityType entityType) { - return listActiveEntities(callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue()); + @NotNull PolarisEntityType entityType, + @NotNull PageToken pageToken) { + return listActiveEntities( + callCtx, catalogId, parentId, entityType, pageToken, Predicates.alwaysTrue()); } @Override - public @NotNull List listActiveEntities( + public @NotNull PolarisPage listActiveEntities( @NotNull PolarisCallContext callCtx, long catalogId, long parentId, @NotNull PolarisEntityType entityType, + @NotNull PageToken pageToken, @NotNull Predicate entityFilter) { // full range scan under the parent for that type return listActiveEntities( @@ -340,7 +349,7 @@ public List lookupEntityActiveBatch( catalogId, parentId, entityType, - Integer.MAX_VALUE, + pageToken, entityFilter, entity -> new PolarisEntityActiveRecord( @@ -353,23 +362,38 @@ public List lookupEntityActiveBatch( } @Override - public @NotNull List listActiveEntities( + public @NotNull PolarisPage listActiveEntities( @NotNull PolarisCallContext callCtx, long catalogId, long parentId, @NotNull PolarisEntityType entityType, - int limit, + @NotNull PageToken pageToken, @NotNull Predicate entityFilter, @NotNull Function transformer) { + if (!(pageToken instanceof ReadEverythingPageToken) + && !(pageToken instanceof OffsetPageToken)) { + throw new IllegalArgumentException("Unexpected pageToken: " + pageToken); + } + // full range scan under the parent for that type - return this.store - .getSliceEntitiesActive() - .readRange(this.store.buildPrefixKeyComposite(catalogId, parentId, entityType.getCode())) - .stream() - .filter(entityFilter) - .limit(limit) - .map(transformer) - .collect(Collectors.toList()); + Stream partialResults = + this.store + .getSliceEntitiesActive() + .readRange( + this.store.buildPrefixKeyComposite(catalogId, parentId, entityType.getCode())) + .stream() + .filter(entityFilter); + + if (pageToken instanceof OffsetPageToken) { + partialResults = + partialResults + .sorted(Comparator.comparingLong(PolarisEntityCore::getId)) + .skip(((OffsetPageToken) pageToken).offset) + .limit(pageToken.pageSize); + } + + List entities = partialResults.map(transformer).collect(Collectors.toList()); + return pageToken.buildNextPage(entities); } /** {@inheritDoc} */ @@ -570,4 +594,9 @@ PolarisStorageIntegration loadPolarisStorageIntegration( public void rollback() { this.store.rollback(); } + + @Override + public @NotNull PageToken.PageTokenBuilder pageTokenBuilder() { + return OffsetPageToken.builder(); + } } diff --git a/polaris-core/src/test/java/org/apache/polaris/core/catalog/pagination/PageTokenTest.java b/polaris-core/src/test/java/org/apache/polaris/core/catalog/pagination/PageTokenTest.java new file mode 100644 index 000000000..648b74114 --- /dev/null +++ b/polaris-core/src/test/java/org/apache/polaris/core/catalog/pagination/PageTokenTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.catalog.pagination; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.polaris.core.persistence.models.ModelEntity; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PageTokenTest { + private static final Logger LOGGER = LoggerFactory.getLogger(PageTokenTest.class); + + static Stream> getPageTokenBuilders() { + return Stream.of( + OffsetPageToken.builder(), EntityIdPageToken.builder(), ReadEverythingPageToken.builder()); + } + + @Test + void testDoneToken() { + Assertions.assertThat(PageToken.DONE).isNull(); + } + + @ParameterizedTest + @MethodSource("getPageTokenBuilders") + void testRoundTrips(PageToken.PageTokenBuilder builder) { + if (builder instanceof ReadEverythingPageToken.ReadEverythingPageTokenBuilder) { + // Skip ReadEverythingPageToken + return; + } + + for (int limit : List.of(1, 10, 100, Integer.MAX_VALUE)) { + PageToken token = builder.fromLimit(limit); + Assertions.assertThat(token.pageSize).isEqualTo(limit); + Assertions.assertThat(builder.fromString(token.toString())).isEqualTo(token); + } + } + + @ParameterizedTest + @MethodSource("getPageTokenBuilders") + void testInvalidLimits(PageToken.PageTokenBuilder builder) { + if (builder instanceof ReadEverythingPageToken.ReadEverythingPageTokenBuilder) { + // Skip ReadEverythingPageToken + return; + } + + for (int limit : List.of(-1, 0)) { + Assertions.assertThatThrownBy(() -> builder.fromLimit(limit)) + .isInstanceOf(IllegalArgumentException.class); + } + + Assertions.assertThat(builder.fromLimit(null)).isInstanceOf(ReadEverythingPageToken.class); + } + + @ParameterizedTest + @MethodSource("getPageTokenBuilders") + void testStartingTokens(PageToken.PageTokenBuilder builder) { + Assertions.assertThat(builder.fromString("")).isNotNull(); + if (!(builder instanceof ReadEverythingPageToken.ReadEverythingPageTokenBuilder)) { + Assertions.assertThat(builder.fromString("")).isNotEqualTo(ReadEverythingPageToken.get()); + } + + Assertions.assertThatThrownBy(() -> builder.fromString(null)) + .isInstanceOf(IllegalArgumentException.class); + } + + @ParameterizedTest + @MethodSource("getPageTokenBuilders") + void testPageBuilding(PageToken.PageTokenBuilder builder) { + if (builder instanceof ReadEverythingPageToken.ReadEverythingPageTokenBuilder) { + // Skip ReadEverythingPageToken + return; + } + + List data = + List.of(ModelEntity.builder().id(1).build(), ModelEntity.builder().id(2).build()); + + PageToken token = builder.fromLimit(1000); + Assertions.assertThat(token.buildNextPage(data).data).isEqualTo(data); + Assertions.assertThat(token.buildNextPage(data).pageToken).isNull(); + } + + @Test + void testUniquePrefixes() { + Stream> builders = getPageTokenBuilders(); + List prefixes = + builders.map(PageToken.PageTokenBuilder::tokenPrefix).collect(Collectors.toList()); + Assertions.assertThat(prefixes.size()).isEqualTo(prefixes.stream().distinct().count()); + } + + @ParameterizedTest + @MethodSource("getPageTokenBuilders") + void testCrossTokenParsing(PageToken.PageTokenBuilder builder) { + var otherBuilders = getPageTokenBuilders().collect(Collectors.toList()); + for (var otherBuilder : otherBuilders) { + LOGGER.info( + "Testing {} being parsed by {}", + builder.getClass().getSimpleName(), + otherBuilder.getClass().getSimpleName()); + + final PageToken token; + if (builder instanceof ReadEverythingPageToken.ReadEverythingPageTokenBuilder) { + token = ReadEverythingPageToken.get(); + } else { + token = builder.fromLimit(1234); + } + if (otherBuilder.getClass().equals(builder.getClass())) { + Assertions.assertThat(otherBuilder.fromString(token.toString())).isEqualTo(token); + } else { + Assertions.assertThatThrownBy(() -> otherBuilder.fromString(token.toString())) + .isInstanceOf(IllegalArgumentException.class); + } + } + } + + @ParameterizedTest + @MethodSource("getPageTokenBuilders") + void testDefaultTokens(PageToken.PageTokenBuilder builder) { + if (builder instanceof ReadEverythingPageToken.ReadEverythingPageTokenBuilder) { + // Skip ReadEverythingPageToken + return; + } + + PageToken token = builder.fromString(""); + Assertions.assertThat(token.toString()).isNotNull(); + Assertions.assertThat(token.pageSize).isEqualTo(PageToken.DEFAULT_PAGE_SIZE); + } + + @Test + void testReadEverythingPageToken() { + PageToken token = ReadEverythingPageToken.get(); + + Assertions.assertThat(token.toString()).isNotNull(); + Assertions.assertThat(token.updated(List.of("anything"))).isEqualTo(PageToken.DONE); + Assertions.assertThat(token.pageSize).isEqualTo(Integer.MAX_VALUE); + } + + @Test + void testOffsetPageToken() { + OffsetPageToken token = (OffsetPageToken) OffsetPageToken.builder().fromLimit(2); + + Assertions.assertThat(token).isInstanceOf(OffsetPageToken.class); + Assertions.assertThat(token.offset).isEqualTo(0); + + List data = List.of("some", "data"); + var page = token.buildNextPage(data); + Assertions.assertThat(page.pageToken).isNotNull(); + Assertions.assertThat(page.pageToken).isInstanceOf(OffsetPageToken.class); + Assertions.assertThat(page.pageToken.pageSize).isEqualTo(2); + Assertions.assertThat(((OffsetPageToken) page.pageToken).offset).isEqualTo(2); + Assertions.assertThat(page.data).isEqualTo(data); + + Assertions.assertThat(OffsetPageToken.builder().fromString(page.pageToken.toString())) + .isEqualTo(page.pageToken); + } + + @Test + void testEntityIdPageToken() { + EntityIdPageToken token = (EntityIdPageToken) EntityIdPageToken.builder().fromLimit(2); + + Assertions.assertThat(token).isInstanceOf(EntityIdPageToken.class); + Assertions.assertThat(token.id).isEqualTo(-1L); + + List badData = List.of("some", "data"); + Assertions.assertThatThrownBy(() -> token.buildNextPage(badData)) + .isInstanceOf(IllegalArgumentException.class); + + List data = + List.of(ModelEntity.builder().id(101).build(), ModelEntity.builder().id(102).build()); + var page = token.buildNextPage(data); + + Assertions.assertThat(page.pageToken).isNotNull(); + Assertions.assertThat(page.pageToken).isInstanceOf(EntityIdPageToken.class); + Assertions.assertThat(page.pageToken.pageSize).isEqualTo(2); + Assertions.assertThat(((EntityIdPageToken) page.pageToken).id).isEqualTo(102); + Assertions.assertThat(page.data).isEqualTo(data); + + Assertions.assertThat(EntityIdPageToken.builder().fromString(page.pageToken.toString())) + .isEqualTo(page.pageToken); + } +} diff --git a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java index bf97e4128..772776ccf 100644 --- a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java +++ b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java @@ -35,6 +35,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.catalog.pagination.ReadEverythingPageToken; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.entity.AsyncTaskType; import org.apache.polaris.core.entity.PolarisBaseEntity; @@ -128,7 +129,8 @@ void testCreateEntities() { polarisTestMetaStoreManager.polarisCallContext, null, PolarisEntityType.TASK, - PolarisEntitySubType.NULL_SUBTYPE) + PolarisEntitySubType.NULL_SUBTYPE, + ReadEverythingPageToken.get()) .getEntities(); Assertions.assertThat(listedEntities) .isNotNull() @@ -290,7 +292,9 @@ void testLoadTasks() { PolarisMetaStoreManager metaStoreManager = polarisTestMetaStoreManager.polarisMetaStoreManager; PolarisCallContext callCtx = polarisTestMetaStoreManager.polarisCallContext; List taskList = - metaStoreManager.loadTasks(callCtx, executorId, 5).getEntities(); + metaStoreManager + .loadTasks(callCtx, executorId, callCtx.getMetaStore().pageTokenBuilder().fromLimit(5)) + .getEntities(); Assertions.assertThat(taskList) .isNotNull() .isNotEmpty() @@ -310,7 +314,9 @@ void testLoadTasks() { // grab a second round of tasks. Assert that none of the original 5 are in the list List newTaskList = - metaStoreManager.loadTasks(callCtx, executorId, 5).getEntities(); + metaStoreManager + .loadTasks(callCtx, executorId, callCtx.getMetaStore().pageTokenBuilder().fromLimit(5)) + .getEntities(); Assertions.assertThat(newTaskList) .isNotNull() .isNotEmpty() @@ -324,7 +330,9 @@ void testLoadTasks() { // only 10 tasks are unassigned. Requesting 20, we should only receive those 10 List lastTen = - metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities(); + metaStoreManager + .loadTasks(callCtx, executorId, callCtx.getMetaStore().pageTokenBuilder().fromLimit(20)) + .getEntities(); Assertions.assertThat(lastTen) .isNotNull() @@ -338,7 +346,9 @@ void testLoadTasks() { .collect(Collectors.toSet()); List emtpyList = - metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities(); + metaStoreManager + .loadTasks(callCtx, executorId, callCtx.getMetaStore().pageTokenBuilder().fromLimit(20)) + .getEntities(); Assertions.assertThat(emtpyList).isNotNull().isEmpty(); @@ -346,7 +356,9 @@ void testLoadTasks() { // all the tasks are unassigned. Fetch them all List allTasks = - metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities(); + metaStoreManager + .loadTasks(callCtx, executorId, callCtx.getMetaStore().pageTokenBuilder().fromLimit(20)) + .getEntities(); Assertions.assertThat(allTasks) .isNotNull() @@ -361,7 +373,9 @@ void testLoadTasks() { timeSource.add(Duration.ofMinutes(10)); List finalList = - metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities(); + metaStoreManager + .loadTasks(callCtx, executorId, callCtx.getMetaStore().pageTokenBuilder().fromLimit(20)) + .getEntities(); Assertions.assertThat(finalList).isNotNull().isEmpty(); } @@ -390,7 +404,13 @@ void testLoadTasksInParallel() throws Exception { do { retry = false; try { - taskList = metaStoreManager.loadTasks(callCtx, executorId, 5).getEntities(); + taskList = + metaStoreManager + .loadTasks( + callCtx, + executorId, + callCtx.getMetaStore().pageTokenBuilder().fromLimit(5)) + .getEntities(); taskList.stream().map(PolarisBaseEntity::getName).forEach(taskNames::add); } catch (RetryOnConcurrencyException e) { retry = true; diff --git a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java index 893d22927..401e4d40e 100644 --- a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java +++ b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java @@ -28,6 +28,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.catalog.pagination.ReadEverythingPageToken; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisChangeTrackingVersions; import org.apache.polaris.core.entity.PolarisEntity; @@ -653,7 +654,8 @@ void dropEntity(List catalogPath, PolarisEntityCore entityToD this.polarisCallContext, path, PolarisEntityType.NAMESPACE, - PolarisEntitySubType.NULL_SUBTYPE) + PolarisEntitySubType.NULL_SUBTYPE, + ReadEverythingPageToken.get()) .getEntities(); Assertions.assertThat(children).isNotNull(); if (children.isEmpty() && entity.getType() == PolarisEntityType.NAMESPACE) { @@ -663,7 +665,8 @@ void dropEntity(List catalogPath, PolarisEntityCore entityToD this.polarisCallContext, path, PolarisEntityType.TABLE_LIKE, - PolarisEntitySubType.ANY_SUBTYPE) + PolarisEntitySubType.ANY_SUBTYPE, + ReadEverythingPageToken.get()) .getEntities(); Assertions.assertThat(children).isNotNull(); } else if (children.isEmpty()) { @@ -673,7 +676,8 @@ void dropEntity(List catalogPath, PolarisEntityCore entityToD this.polarisCallContext, path, PolarisEntityType.CATALOG_ROLE, - PolarisEntitySubType.ANY_SUBTYPE) + PolarisEntitySubType.ANY_SUBTYPE, + ReadEverythingPageToken.get()) .getEntities(); Assertions.assertThat(children).isNotNull(); // if only one left, it can be dropped. @@ -1182,7 +1186,12 @@ private void validateListReturn( // list the entities under the specified path List result = polarisMetaStoreManager - .listEntities(this.polarisCallContext, path, entityType, entitySubType) + .listEntities( + this.polarisCallContext, + path, + entityType, + entitySubType, + ReadEverythingPageToken.get()) .getEntities(); Assertions.assertThat(result).isNotNull(); @@ -1495,7 +1504,8 @@ void validateBootstrap() { this.polarisCallContext, null, PolarisEntityType.PRINCIPAL, - PolarisEntitySubType.NULL_SUBTYPE) + PolarisEntitySubType.NULL_SUBTYPE, + ReadEverythingPageToken.get()) .getEntities(); // ensure not null, one element only @@ -1521,7 +1531,8 @@ void validateBootstrap() { this.polarisCallContext, null, PolarisEntityType.PRINCIPAL_ROLE, - PolarisEntitySubType.NULL_SUBTYPE) + PolarisEntitySubType.NULL_SUBTYPE, + ReadEverythingPageToken.get()) .getEntities(); // ensure not null, one element only diff --git a/polaris-service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java b/polaris-service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java index 023c96f13..5f4a14adc 100644 --- a/polaris-service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java +++ b/polaris-service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java @@ -57,6 +57,7 @@ import org.apache.polaris.core.auth.PolarisAuthorizableOperation; import org.apache.polaris.core.auth.PolarisAuthorizer; import org.apache.polaris.core.catalog.PolarisCatalogHelpers; +import org.apache.polaris.core.catalog.pagination.ReadEverythingPageToken; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.entity.CatalogEntity; import org.apache.polaris.core.entity.CatalogRoleEntity; @@ -720,7 +721,8 @@ private List listCatalogsUnsafe() { getCurrentPolarisContext(), null, PolarisEntityType.CATALOG, - PolarisEntitySubType.ANY_SUBTYPE) + PolarisEntitySubType.ANY_SUBTYPE, + ReadEverythingPageToken.get()) .getEntities() .stream() .map( @@ -898,7 +900,8 @@ public List listPrincipals() { getCurrentPolarisContext(), null, PolarisEntityType.PRINCIPAL, - PolarisEntitySubType.NULL_SUBTYPE) + PolarisEntitySubType.NULL_SUBTYPE, + ReadEverythingPageToken.get()) .getEntities() .stream() .map( @@ -1020,7 +1023,8 @@ public List listPrincipalRoles() { getCurrentPolarisContext(), null, PolarisEntityType.PRINCIPAL_ROLE, - PolarisEntitySubType.NULL_SUBTYPE) + PolarisEntitySubType.NULL_SUBTYPE, + ReadEverythingPageToken.get()) .getEntities() .stream() .map( @@ -1161,7 +1165,8 @@ public List listCatalogRoles(String catalogName) { getCurrentPolarisContext(), PolarisEntity.toCoreList(List.of(catalogEntity)), PolarisEntityType.CATALOG_ROLE, - PolarisEntitySubType.NULL_SUBTYPE) + PolarisEntitySubType.NULL_SUBTYPE, + ReadEverythingPageToken.get()) .getEntities() .stream() .map( diff --git a/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java b/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java index d27cc82c4..271d59222 100644 --- a/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java +++ b/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java @@ -77,6 +77,9 @@ import org.apache.polaris.core.PolarisConfiguration; import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal; import org.apache.polaris.core.catalog.PolarisCatalogHelpers; +import org.apache.polaris.core.catalog.pagination.PageToken; +import org.apache.polaris.core.catalog.pagination.PolarisPage; +import org.apache.polaris.core.catalog.pagination.ReadEverythingPageToken; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.entity.CatalogEntity; import org.apache.polaris.core.entity.NamespaceEntity; @@ -171,6 +174,7 @@ public class BasePolarisCatalog extends BaseMetastoreViewCatalog private Map catalogProperties; private Map tableDefaultProperties; private final FileIOFactory fileIOFactory; + private final PageToken.PageTokenBuilder pageTokenBuilder; /** * @param entityManager provides handle to underlying PolarisMetaStoreManager with which to @@ -197,6 +201,7 @@ public BasePolarisCatalog( this.catalogId = catalogEntity.getId(); this.catalogName = catalogEntity.getName(); this.fileIOFactory = fileIOFactory; + this.pageTokenBuilder = entityManager.newMetaStoreSession().pageTokenBuilder(); } @Override @@ -438,14 +443,32 @@ public boolean dropTable(TableIdentifier tableIdentifier, boolean purge) { return true; } + /** Check whether pagination is enabled for list operations */ + private boolean paginationEnabled() { + return callContext + .getPolarisCallContext() + .getConfigurationStore() + .getConfiguration( + callContext.getPolarisCallContext(), + catalogEntity, + PolarisConfiguration.LIST_PAGINATION_ENABLED); + } + @Override public List listTables(Namespace namespace) { + return listTables(namespace, ReadEverythingPageToken.get()).data; + } + + public PolarisPage listTables(Namespace namespace, PageToken pageToken) { if (!namespaceExists(namespace) && !namespace.isEmpty()) { throw new NoSuchNamespaceException( "Cannot list tables for namespace. Namespace does not exist: %s", namespace); } + if (!paginationEnabled()) { + pageToken = ReadEverythingPageToken.get(); + } - return listTableLike(PolarisEntitySubType.TABLE, namespace); + return listTableLike(PolarisEntitySubType.TABLE, namespace, pageToken); } @Override @@ -735,23 +758,42 @@ public List listNamespaces() { @Override public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + return listNamespaces(namespace, ReadEverythingPageToken.get()).data; + } + + public PolarisPage listNamespaces(PageToken pageToken) + throws NoSuchNamespaceException { + return listNamespaces(Namespace.empty(), pageToken); + } + + public PolarisPage listNamespaces(Namespace namespace, PageToken pageToken) + throws NoSuchNamespaceException { PolarisResolvedPathWrapper resolvedEntities = resolvedEntityView.getResolvedPath(namespace); if (resolvedEntities == null) { throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); } + if (!paginationEnabled()) { + pageToken = ReadEverythingPageToken.get(); + } List catalogPath = resolvedEntities.getRawFullPath(); + PolarisMetaStoreManager.ListEntitiesResult listResult = + entityManager + .getMetaStoreManager() + .listEntities( + getCurrentPolarisContext(), + PolarisEntity.toCoreList(catalogPath), + PolarisEntityType.NAMESPACE, + PolarisEntitySubType.NULL_SUBTYPE, + pageToken); List entities = - PolarisEntity.toNameAndIdList( - entityManager - .getMetaStoreManager() - .listEntities( - getCurrentPolarisContext(), - PolarisEntity.toCoreList(catalogPath), - PolarisEntityType.NAMESPACE, - PolarisEntitySubType.NULL_SUBTYPE) - .getEntities()); - return PolarisCatalogHelpers.nameAndIdToNamespaces(catalogPath, entities); + PolarisEntity.toNameAndIdList(listResult.getEntities()); + List namespaces = PolarisCatalogHelpers.nameAndIdToNamespaces(catalogPath, entities); + + return listResult + .getPageToken() + .map(token -> new PolarisPage<>(token, namespaces)) + .orElseGet(() -> PolarisPage.fromData(namespaces)); } @Override @@ -759,12 +801,19 @@ public void close() throws IOException {} @Override public List listViews(Namespace namespace) { + return listViews(namespace, ReadEverythingPageToken.get()).data; + } + + public PolarisPage listViews(Namespace namespace, PageToken pageToken) { if (!namespaceExists(namespace) && !namespace.isEmpty()) { throw new NoSuchNamespaceException( "Cannot list views for namespace. Namespace does not exist: %s", namespace); } + if (!paginationEnabled()) { + pageToken = ReadEverythingPageToken.get(); + } - return listTableLike(PolarisEntitySubType.VIEW, namespace); + return listTableLike(PolarisEntitySubType.VIEW, namespace, pageToken); } @Override @@ -1038,7 +1087,8 @@ private void validateNoLocationOverlap( callContext.getPolarisCallContext(), parentPath.stream().map(PolarisEntity::toCore).collect(Collectors.toList()), PolarisEntityType.NAMESPACE, - PolarisEntitySubType.ANY_SUBTYPE); + PolarisEntitySubType.ANY_SUBTYPE, + ReadEverythingPageToken.get()); if (!siblingNamespacesResult.isSuccess()) { throw new IllegalStateException( "Unable to resolve siblings entities to validate location - could not list namespaces"); @@ -1064,7 +1114,8 @@ private void validateNoLocationOverlap( .map(PolarisEntity::toCore) .collect(Collectors.toList()), PolarisEntityType.TABLE_LIKE, - PolarisEntitySubType.ANY_SUBTYPE); + PolarisEntitySubType.ANY_SUBTYPE, + ReadEverythingPageToken.get()); if (!siblingTablesResult.isSuccess()) { throw new IllegalStateException( "Unable to resolve siblings entities to validate location - could not list tables"); @@ -1963,7 +2014,8 @@ private void createNonExistingNamespaces(Namespace namespace) { } } - private List listTableLike(PolarisEntitySubType subType, Namespace namespace) { + private PolarisPage listTableLike( + PolarisEntitySubType subType, Namespace namespace, PageToken pageToken) { PolarisResolvedPathWrapper resolvedEntities = resolvedEntityView.getResolvedPath(namespace); if (resolvedEntities == null) { // Illegal state because the namespace should've already been in the static resolution set. @@ -1972,17 +2024,24 @@ private List listTableLike(PolarisEntitySubType subType, Namesp } List catalogPath = resolvedEntities.getRawFullPath(); + PolarisMetaStoreManager.ListEntitiesResult listResult = + entityManager + .getMetaStoreManager() + .listEntities( + getCurrentPolarisContext(), + PolarisEntity.toCoreList(catalogPath), + PolarisEntityType.TABLE_LIKE, + subType, + pageToken); List entities = - PolarisEntity.toNameAndIdList( - entityManager - .getMetaStoreManager() - .listEntities( - getCurrentPolarisContext(), - PolarisEntity.toCoreList(catalogPath), - PolarisEntityType.TABLE_LIKE, - subType) - .getEntities()); - return PolarisCatalogHelpers.nameAndIdToTableIdentifiers(catalogPath, entities); + PolarisEntity.toNameAndIdList(listResult.getEntities()); + List identifiers = + PolarisCatalogHelpers.nameAndIdToTableIdentifiers(catalogPath, entities); + + return listResult + .getPageToken() + .map(token -> new PolarisPage<>(token, identifiers)) + .orElseGet(() -> PolarisPage.fromData(identifiers)); } /** diff --git a/polaris-service/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java b/polaris-service/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java index 53cc053b8..5ba46afa8 100644 --- a/polaris-service/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java +++ b/polaris-service/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import edu.umd.cs.findbugs.annotations.Nullable; import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.SecurityContext; import java.net.URLEncoder; @@ -51,6 +52,7 @@ import org.apache.iceberg.rest.responses.ConfigResponse; import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal; import org.apache.polaris.core.auth.PolarisAuthorizer; +import org.apache.polaris.core.catalog.pagination.PageToken; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.entity.PolarisEntity; @@ -108,6 +110,22 @@ private PolarisCatalogHandlerWrapper newHandlerWrapper( polarisAuthorizer); } + /** Build a {@link PageToken} from a string and page size. */ + private PageToken buildPageToken( + PolarisEntityManager entityManager, + @Nullable String tokenString, + @Nullable Integer pageSize) { + if (tokenString != null) { + return entityManager + .newMetaStoreSession() + .pageTokenBuilder() + .fromString(tokenString) + .withPageSize(pageSize); + } else { + return entityManager.newMetaStoreSession().pageTokenBuilder().fromLimit(pageSize); + } + } + @Override public Response createNamespace( String prefix, @@ -127,10 +145,16 @@ public Response listNamespaces( SecurityContext securityContext) { Optional namespaceOptional = Optional.ofNullable(parent).map(IcebergCatalogAdapter::decodeNamespace); - return Response.ok( - newHandlerWrapper(securityContext, prefix) - .listNamespaces(namespaceOptional.orElse(Namespace.of()))) - .build(); + + PolarisEntityManager entityManager = + entityManagerFactory.getOrCreateEntityManager( + CallContext.getCurrentContext().getRealmContext()); + PageToken token = buildPageToken(entityManager, pageToken, pageSize); + + var response = + newHandlerWrapper(securityContext, prefix) + .listNamespaces(namespaceOptional.orElse(Namespace.of()), token); + return Response.ok(response).build(); } @Override @@ -225,7 +249,12 @@ public Response listTables( Integer pageSize, SecurityContext securityContext) { Namespace ns = decodeNamespace(namespace); - return Response.ok(newHandlerWrapper(securityContext, prefix).listTables(ns)).build(); + + PolarisEntityManager entityManager = + entityManagerFactory.getOrCreateEntityManager( + CallContext.getCurrentContext().getRealmContext()); + PageToken token = buildPageToken(entityManager, pageToken, pageSize); + return Response.ok(newHandlerWrapper(securityContext, prefix).listTables(ns, token)).build(); } @Override @@ -361,7 +390,11 @@ public Response listViews( Integer pageSize, SecurityContext securityContext) { Namespace ns = decodeNamespace(namespace); - return Response.ok(newHandlerWrapper(securityContext, prefix).listViews(ns)).build(); + PolarisEntityManager entityManager = + entityManagerFactory.getOrCreateEntityManager( + CallContext.getCurrentContext().getRealmContext()); + PageToken token = buildPageToken(entityManager, pageToken, pageSize); + return Response.ok(newHandlerWrapper(securityContext, prefix).listViews(ns, token)).build(); } @Override diff --git a/polaris-service/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java b/polaris-service/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java index 78e3935d8..f3281806f 100644 --- a/polaris-service/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java +++ b/polaris-service/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java @@ -72,6 +72,8 @@ import org.apache.polaris.core.auth.PolarisAuthorizableOperation; import org.apache.polaris.core.auth.PolarisAuthorizer; import org.apache.polaris.core.catalog.PolarisCatalogHelpers; +import org.apache.polaris.core.catalog.pagination.PageToken; +import org.apache.polaris.core.catalog.pagination.PolarisPage; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.entity.CatalogEntity; import org.apache.polaris.core.entity.PolarisEntitySubType; @@ -83,6 +85,8 @@ import org.apache.polaris.core.persistence.resolver.ResolverStatus; import org.apache.polaris.core.storage.PolarisStorageActions; import org.apache.polaris.service.context.CallContextCatalogFactory; +import org.apache.polaris.service.types.ListNamespacesResponseWithPageToken; +import org.apache.polaris.service.types.ListTablesResponseWithPageToken; import org.apache.polaris.service.types.NotificationRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -409,6 +413,21 @@ private void authorizeRenameTableLikeOperationOrThrow( initializeCatalog(); } + public ListNamespacesResponseWithPageToken listNamespaces(Namespace parent, PageToken pageToken) { + PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_NAMESPACES; + authorizeBasicNamespaceOperationOrThrow(op, parent); + + if (baseCatalog instanceof BasePolarisCatalog bpc) { + return ListNamespacesResponseWithPageToken.fromPolarisPage( + doCatalogOperation(() -> bpc.listNamespaces(parent, pageToken))); + } else { + return ListNamespacesResponseWithPageToken.fromPolarisPage( + PolarisPage.fromData( + doCatalogOperation(() -> CatalogHandlers.listNamespaces(namespaceCatalog, parent)) + .namespaces())); + } + } + public ListNamespacesResponse listNamespaces(Namespace parent) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_NAMESPACES; authorizeBasicNamespaceOperationOrThrow(op, parent); @@ -522,6 +541,21 @@ public UpdateNamespacePropertiesResponse updateNamespaceProperties( () -> CatalogHandlers.updateNamespaceProperties(namespaceCatalog, namespace, request)); } + public ListTablesResponseWithPageToken listTables(Namespace namespace, PageToken pageToken) { + PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_TABLES; + authorizeBasicNamespaceOperationOrThrow(op, namespace); + + if (baseCatalog instanceof BasePolarisCatalog bpc) { + return ListTablesResponseWithPageToken.fromPolarisPage( + doCatalogOperation(() -> bpc.listTables(namespace, pageToken))); + } else { + return ListTablesResponseWithPageToken.fromPolarisPage( + PolarisPage.fromData( + doCatalogOperation(() -> CatalogHandlers.listTables(baseCatalog, namespace)) + .identifiers())); + } + } + public ListTablesResponse listTables(Namespace namespace) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_TABLES; authorizeBasicNamespaceOperationOrThrow(op, namespace); @@ -982,6 +1016,21 @@ public ListTablesResponse listViews(Namespace namespace) { return doCatalogOperation(() -> CatalogHandlers.listViews(viewCatalog, namespace)); } + public ListTablesResponseWithPageToken listViews(Namespace namespace, PageToken pageToken) { + PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_VIEWS; + authorizeBasicNamespaceOperationOrThrow(op, namespace); + + if (baseCatalog instanceof BasePolarisCatalog bpc) { + return ListTablesResponseWithPageToken.fromPolarisPage( + doCatalogOperation(() -> bpc.listViews(namespace, pageToken))); + } else { + return ListTablesResponseWithPageToken.fromPolarisPage( + PolarisPage.fromData( + doCatalogOperation(() -> CatalogHandlers.listTables(baseCatalog, namespace)) + .identifiers())); + } + } + public LoadViewResponse createView(Namespace namespace, CreateViewRequest request) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.CREATE_VIEW; authorizeCreateTableLikeUnderNamespaceOperationOrThrow( diff --git a/polaris-service/src/main/java/org/apache/polaris/service/types/ListNamespacesResponseWithPageToken.java b/polaris-service/src/main/java/org/apache/polaris/service/types/ListNamespacesResponseWithPageToken.java new file mode 100644 index 000000000..5e9e9d031 --- /dev/null +++ b/polaris-service/src/main/java/org/apache/polaris/service/types/ListNamespacesResponseWithPageToken.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.types; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; +import java.util.List; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.rest.responses.ListNamespacesResponse; +import org.apache.polaris.core.catalog.pagination.PageToken; +import org.apache.polaris.core.catalog.pagination.PolarisPage; + +/** + * Used in lieu of {@link ListNamespacesResponse} when there may be a {@link PageToken} associated + * with the response. Callers can use this {@link PageToken} to continue the listing operation and + * obtain more results. + */ +public class ListNamespacesResponseWithPageToken extends ListNamespacesResponse { + private final PageToken pageToken; + + private final List namespaces; + + public ListNamespacesResponseWithPageToken(PageToken pageToken, List namespaces) { + this.pageToken = pageToken; + this.namespaces = namespaces; + Preconditions.checkArgument(this.namespaces != null, "Invalid namespace: null"); + } + + public static ListNamespacesResponseWithPageToken fromPolarisPage( + PolarisPage polarisPage) { + return new ListNamespacesResponseWithPageToken(polarisPage.pageToken, polarisPage.data); + } + + @JsonProperty("next-page-token") + public String getPageToken() { + if (pageToken == null) { + return null; + } else { + return pageToken.toString(); + } + } + + @Override + public List namespaces() { + return this.namespaces != null ? this.namespaces : List.of(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("namespaces", this.namespaces) + .add("pageToken", this.pageToken.toString()) + .toString(); + } +} diff --git a/polaris-service/src/main/java/org/apache/polaris/service/types/ListTablesResponseWithPageToken.java b/polaris-service/src/main/java/org/apache/polaris/service/types/ListTablesResponseWithPageToken.java new file mode 100644 index 000000000..7689c2576 --- /dev/null +++ b/polaris-service/src/main/java/org/apache/polaris/service/types/ListTablesResponseWithPageToken.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.types; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; +import java.util.List; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.responses.ListTablesResponse; +import org.apache.polaris.core.catalog.pagination.PageToken; +import org.apache.polaris.core.catalog.pagination.PolarisPage; + +/** + * Used in lieu of {@link ListTablesResponse} when there may be a {@link PageToken} associated with + * the response. Callers can use this {@link PageToken} to continue the listing operation and obtain + * more results. + */ +public class ListTablesResponseWithPageToken extends ListTablesResponse { + private final PageToken pageToken; + + private final List identifiers; + + public ListTablesResponseWithPageToken(PageToken pageToken, List identifiers) { + this.pageToken = pageToken; + this.identifiers = identifiers; + Preconditions.checkArgument(this.identifiers != null, "Invalid identifier list: null"); + } + + public static ListTablesResponseWithPageToken fromPolarisPage( + PolarisPage polarisPage) { + return new ListTablesResponseWithPageToken(polarisPage.pageToken, polarisPage.data); + } + + @JsonProperty("next-page-token") + public String getPageToken() { + if (pageToken == null) { + return null; + } else { + return pageToken.toString(); + } + } + + @Override + public List identifiers() { + return this.identifiers != null ? this.identifiers : List.of(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("identifiers", this.identifiers) + .add("pageToken", this.pageToken) + .toString(); + } +} diff --git a/polaris-service/src/test/java/org/apache/polaris/service/catalog/BasePolarisCatalogTest.java b/polaris-service/src/test/java/org/apache/polaris/service/catalog/BasePolarisCatalogTest.java index c16b93cf9..a33128205 100644 --- a/polaris-service/src/test/java/org/apache/polaris/service/catalog/BasePolarisCatalogTest.java +++ b/polaris-service/src/test/java/org/apache/polaris/service/catalog/BasePolarisCatalogTest.java @@ -63,6 +63,7 @@ import org.apache.polaris.core.admin.model.StorageConfigInfo; import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal; import org.apache.polaris.core.auth.PolarisAuthorizer; +import org.apache.polaris.core.catalog.pagination.PolarisPage; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.entity.CatalogEntity; @@ -197,6 +198,7 @@ public void before() { PolarisConfiguration.ALLOW_EXTERNAL_TABLE_LOCATION.catalogConfig(), "true") .addProperty( PolarisConfiguration.ALLOW_UNSTRUCTURED_TABLE_LOCATION.catalogConfig(), "true") + .addProperty(PolarisConfiguration.LIST_PAGINATION_ENABLED.catalogConfig(), "true") .setStorageConfigurationInfo(storageConfigModel, storageLocation) .build()); @@ -1187,7 +1189,12 @@ public void testDropTableWithPurge() { .as("Table should not exist after drop") .rejects(TABLE); List tasks = - metaStoreManager.loadTasks(polarisContext, "testExecutor", 1).getEntities(); + metaStoreManager + .loadTasks( + polarisContext, + "testExecutor", + polarisContext.getMetaStore().pageTokenBuilder().fromLimit(1)) + .getEntities(); Assertions.assertThat(tasks).hasSize(1); TaskEntity taskEntity = TaskEntity.of(tasks.get(0)); EnumMap credentials = @@ -1362,9 +1369,138 @@ public void testFileIOWrapper() { handler.handleTask( TaskEntity.of( metaStoreManager - .loadTasks(polarisContext, "testExecutor", 1) + .loadTasks( + polarisContext, + "testExecutor", + polarisContext.getMetaStore().pageTokenBuilder().fromLimit(1)) .getEntities() .getFirst())); Assertions.assertThat(measured.getNumDeletedFiles()).as("A table was deleted").isGreaterThan(0); } + + @Test + public void testPaginatedListTables() { + if (this.requiresNamespaceCreate()) { + ((SupportsNamespaces) catalog).createNamespace(NS); + } + + for (int i = 0; i < 5; i++) { + catalog.buildTable(TableIdentifier.of(NS, "pagination_table_" + i), SCHEMA).create(); + } + + try { + // List without pagination + Assertions.assertThat(catalog.listTables(NS)).isNotNull().hasSize(5); + + // List with a limit: + PolarisPage firstListResult = + catalog.listTables(NS, polarisContext.getMetaStore().pageTokenBuilder().fromLimit(2)); + Assertions.assertThat(firstListResult.data.size()).isEqualTo(2); + Assertions.assertThat(firstListResult.pageToken.toString()).isNotNull().isNotEmpty(); + + // List using the previously obtained token: + PolarisPage secondListResult = catalog.listTables(NS, firstListResult.pageToken); + Assertions.assertThat(secondListResult.data.size()).isEqualTo(2); + Assertions.assertThat(secondListResult.pageToken.toString()).isNotNull().isNotEmpty(); + + // List using the final token: + PolarisPage finalListResult = catalog.listTables(NS, secondListResult.pageToken); + Assertions.assertThat(finalListResult.data.size()).isEqualTo(1); + Assertions.assertThat(finalListResult.pageToken).isNull(); + } finally { + for (int i = 0; i < 5; i++) { + catalog.dropTable(TableIdentifier.of(NS, "pagination_table_" + i)); + } + } + } + + @Test + public void testPaginatedListViews() { + if (this.requiresNamespaceCreate()) { + ((SupportsNamespaces) catalog).createNamespace(NS); + } + + for (int i = 0; i < 5; i++) { + catalog + .buildView(TableIdentifier.of(NS, "pagination_view_" + i)) + .withQuery("a_" + i, "SELECT 1 id") + .withSchema(SCHEMA) + .withDefaultNamespace(NS) + .create(); + } + + try { + // List without pagination + Assertions.assertThat(catalog.listViews(NS)).isNotNull().hasSize(5); + + // List with a limit: + PolarisPage firstListResult = + catalog.listViews(NS, polarisContext.getMetaStore().pageTokenBuilder().fromLimit(2)); + Assertions.assertThat(firstListResult.data.size()).isEqualTo(2); + Assertions.assertThat(firstListResult.pageToken.toString()).isNotNull().isNotEmpty(); + + // List using the previously obtained token: + PolarisPage secondListResult = catalog.listViews(NS, firstListResult.pageToken); + Assertions.assertThat(secondListResult.data.size()).isEqualTo(2); + Assertions.assertThat(secondListResult.pageToken.toString()).isNotNull().isNotEmpty(); + + // List using the final token: + PolarisPage finalListResult = catalog.listViews(NS, secondListResult.pageToken); + Assertions.assertThat(finalListResult.data.size()).isEqualTo(1); + Assertions.assertThat(finalListResult.pageToken).isNull(); + } finally { + for (int i = 0; i < 5; i++) { + catalog.dropTable(TableIdentifier.of(NS, "pagination_view_" + i)); + } + } + } + + @Test + public void testPaginatedListNamespaces() { + for (int i = 0; i < 5; i++) { + catalog.createNamespace(Namespace.of("pagination_namespace_" + i)); + } + + try { + // List without pagination + Assertions.assertThat(catalog.listNamespaces()).isNotNull().hasSize(5); + + // List with a limit: + PolarisPage firstListResult = + catalog.listNamespaces(polarisContext.getMetaStore().pageTokenBuilder().fromLimit(2)); + Assertions.assertThat(firstListResult.data.size()).isEqualTo(2); + Assertions.assertThat(firstListResult.pageToken.toString()).isNotNull().isNotEmpty(); + + // List using the previously obtained token: + PolarisPage secondListResult = catalog.listNamespaces(firstListResult.pageToken); + Assertions.assertThat(secondListResult.data.size()).isEqualTo(2); + Assertions.assertThat(secondListResult.pageToken.toString()).isNotNull().isNotEmpty(); + + // List using the final token: + PolarisPage finalListResult = catalog.listNamespaces(secondListResult.pageToken); + Assertions.assertThat(finalListResult.data.size()).isEqualTo(1); + Assertions.assertThat(finalListResult.pageToken).isNull(); + + // List with page size matching the amount of data + PolarisPage firstExactListResult = + catalog.listNamespaces(polarisContext.getMetaStore().pageTokenBuilder().fromLimit(5)); + Assertions.assertThat(firstExactListResult.data.size()).isEqualTo(5); + Assertions.assertThat(firstExactListResult.pageToken.toString()).isNotNull().isNotEmpty(); + + // Again list with matching page size + PolarisPage secondExactListResult = catalog.listNamespaces(firstExactListResult.pageToken); + Assertions.assertThat(secondExactListResult.data).isEmpty(); + Assertions.assertThat(secondExactListResult.pageToken).isNull(); + + // List with huge page size: + PolarisPage bigListResult = + catalog.listNamespaces(polarisContext.getMetaStore().pageTokenBuilder().fromLimit(9999)); + Assertions.assertThat(bigListResult.data.size()).isEqualTo(5); + Assertions.assertThat(bigListResult.pageToken).isNull(); + } finally { + for (int i = 0; i < 5; i++) { + catalog.dropNamespace(Namespace.of("pagination_namespace_" + i)); + } + } + } } diff --git a/polaris-service/src/test/java/org/apache/polaris/service/task/TableCleanupTaskHandlerTest.java b/polaris-service/src/test/java/org/apache/polaris/service/task/TableCleanupTaskHandlerTest.java index d106a26e0..b032feb77 100644 --- a/polaris-service/src/test/java/org/apache/polaris/service/task/TableCleanupTaskHandlerTest.java +++ b/polaris-service/src/test/java/org/apache/polaris/service/task/TableCleanupTaskHandlerTest.java @@ -32,6 +32,7 @@ import org.apache.iceberg.io.FileIO; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDefaultDiagServiceImpl; +import org.apache.polaris.core.catalog.pagination.OffsetPageToken; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.entity.AsyncTaskType; @@ -91,7 +92,7 @@ public void testTableCleanup() throws IOException { assertThat( metaStoreManagerFactory .getOrCreateMetaStoreManager(realmContext) - .loadTasks(polarisCallContext, "test", 1) + .loadTasks(polarisCallContext, "test", OffsetPageToken.builder().fromLimit(1)) .getEntities()) .hasSize(1) .satisfiesExactly( @@ -166,7 +167,7 @@ public void close() { assertThat( metaStoreManagerFactory .getOrCreateMetaStoreManager(realmContext) - .loadTasks(polarisCallContext, "test", 5) + .loadTasks(polarisCallContext, "test", OffsetPageToken.builder().fromLimit(5)) .getEntities()) .hasSize(1); } @@ -236,7 +237,7 @@ public void close() { assertThat( metaStoreManagerFactory .getOrCreateMetaStoreManager(realmContext) - .loadTasks(polarisCallContext, "test", 5) + .loadTasks(polarisCallContext, "test", OffsetPageToken.builder().fromLimit(5)) .getEntities()) .hasSize(2) .satisfiesExactly( @@ -326,7 +327,7 @@ public void testTableCleanupMultipleSnapshots() throws IOException { assertThat( metaStoreManagerFactory .getOrCreateMetaStoreManager(realmContext) - .loadTasks(polarisCallContext, "test", 5) + .loadTasks(polarisCallContext, "test", OffsetPageToken.builder().fromLimit(5)) .getEntities()) // all three manifests should be present, even though one is excluded from the latest // snapshot