Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement pagination for list APIs #273

Open
wants to merge 34 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ce1782b
check in
eric-maynard Sep 5, 2024
63ef916
refactor
eric-maynard Sep 5, 2024
609e7bf
make tests stable
eric-maynard Sep 5, 2024
964bd53
still chasing compile errors
eric-maynard Sep 6, 2024
0ce73af
tests stabler
eric-maynard Sep 6, 2024
cb4a8e2
wip
eric-maynard Sep 6, 2024
fb21e9b
finishing integration
eric-maynard Sep 6, 2024
a725497
many fixes
eric-maynard Sep 6, 2024
2a99045
more signature fixes
eric-maynard Sep 6, 2024
14b0ef3
tests maybe working
eric-maynard Sep 6, 2024
8339e52
tested
eric-maynard Sep 6, 2024
dcaf798
add config, fixes
eric-maynard Sep 6, 2024
6b1c362
improvements
eric-maynard Sep 6, 2024
ccd0713
implement checksum
eric-maynard Sep 6, 2024
feb8c29
more tests
eric-maynard Sep 6, 2024
d649725
one tweak
eric-maynard Sep 9, 2024
c14797f
add another test case
eric-maynard Sep 9, 2024
8403076
add another test case
eric-maynard Sep 9, 2024
724948f
resolve conflicts
eric-maynard Sep 9, 2024
fe0acc8
check in
eric-maynard Sep 9, 2024
1100f71
check in after major refactor
eric-maynard Sep 10, 2024
3fc5939
close, issue with loadtasks
eric-maynard Sep 10, 2024
549c5ab
revert
eric-maynard Sep 10, 2024
d5f127b
re introduce client side filtering
eric-maynard Sep 10, 2024
342eb89
stable tests
eric-maynard Sep 10, 2024
af1b085
lint
eric-maynard Sep 10, 2024
0b313ae
one fix
eric-maynard Sep 10, 2024
b8f4342
fix conflicts
eric-maynard Sep 10, 2024
cd74b58
fix tests
eric-maynard Sep 10, 2024
778c60e
autolint
eric-maynard Sep 11, 2024
e82402b
doc fixes
eric-maynard Sep 11, 2024
eedf8c7
autolint
eric-maynard Sep 11, 2024
54c478f
lots of doc changes
eric-maynard Sep 11, 2024
2e08f0d
autolint
eric-maynard Sep 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.catalog.PageToken;
import org.apache.polaris.core.catalog.PolarisPage;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisChangeTrackingVersions;
Expand Down Expand Up @@ -501,28 +503,31 @@ public List<PolarisEntityActiveRecord> lookupEntityActiveBatch(

/** {@inheritDoc} */
@Override
public @NotNull List<PolarisEntityActiveRecord> listActiveEntities(
public @NotNull PolarisPage<PolarisEntityActiveRecord> listActiveEntities(
@NotNull PolarisCallContext callCtx,
long catalogId,
long parentId,
@NotNull PolarisEntityType entityType) {
return listActiveEntities(callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue());
@NotNull PolarisEntityType entityType,
@NotNull PageToken pageToken) {
return listActiveEntities(
callCtx, catalogId, parentId, entityType, pageToken, Predicates.alwaysTrue());
}

@Override
public @NotNull List<PolarisEntityActiveRecord> listActiveEntities(
public @NotNull PolarisPage<PolarisEntityActiveRecord> listActiveEntities(
@NotNull PolarisCallContext callCtx,
long catalogId,
long parentId,
@NotNull PolarisEntityType entityType,
@NotNull PageToken pageToken,
@NotNull Predicate<PolarisBaseEntity> entityFilter) {
// full range scan under the parent for that type
return listActiveEntities(
callCtx,
catalogId,
parentId,
entityType,
Integer.MAX_VALUE,
pageToken,
entityFilter,
entity ->
new PolarisEntityActiveRecord(
Expand All @@ -535,23 +540,25 @@ public List<PolarisEntityActiveRecord> lookupEntityActiveBatch(
}

@Override
public @NotNull <T> List<T> listActiveEntities(
public @NotNull <T> PolarisPage<T> listActiveEntities(
@NotNull PolarisCallContext callCtx,
long catalogId,
long parentId,
@NotNull PolarisEntityType entityType,
int limit,
@NotNull PageToken pageToken,
@NotNull Predicate<PolarisBaseEntity> entityFilter,
@NotNull Function<PolarisBaseEntity, T> transformer) {
// full range scan under the parent for that type
return this.store
.lookupFullEntitiesActive(localSession.get(), catalogId, parentId, entityType)
.stream()
.map(ModelEntity::toEntity)
.filter(entityFilter)
.limit(limit)
.map(transformer)
.collect(Collectors.toList());
List<T> data =
this.store
.lookupFullEntitiesActive(localSession.get(), catalogId, parentId, entityType)
.stream()
.map(ModelEntity::toEntity)
.filter(entityFilter)
.skip(pageToken.offset)
.limit(pageToken.pageSize)
.map(transformer)
.collect(Collectors.toList());
return pageToken.buildNextPage(data);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,11 @@ public static <T> Builder<T> builder() {
"If set to true, allows tables to have external locations outside the default structure.")
.defaultValue(false)
.build();

public static final PolarisConfiguration<Boolean> PAGINATION_ENABLED =
PolarisConfiguration.<Boolean>builder()
.key("PAGINATION_ENABLED")
.description("If set to true, pagination for APIs like listTables is enabled")
.defaultValue(true)
.build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public interface PolarisConfigurationStore {

if (config.defaultValue instanceof Boolean) {
return config.cast(Boolean.valueOf(String.valueOf(value)));
} else if (config.defaultValue instanceof Integer) {
return config.cast(Integer.valueOf(value.toString()));
} else {
return config.cast(value);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.core.catalog;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;

/**
* Represents a page token that can be used by operations like `listTables`. Clients that specify a
* `pageSize` (or a `pageToken`) may receive a `next-page-token` in the response, the content of
* which is a serialized PageToken.
*
* <p>By providing that in the next query's `pageToken`, the client can resume listing where they
* left off. If the client provides a `pageToken` or `pageSize` but `next-page-token` is null in the
* response, that means there is no more data to read.
*/
public class PageToken {

public final int offset;
public final int pageSize;

private static final String TOKEN_PREFIX = "polaris";
private static final int TOKEN_START = 0;
private static final int DEFAULT_PAGE_SIZE = 1000;

public static PageToken DONE = null;

public PageToken(int offset, int pageSize) {
if (offset < 0 || pageSize <= 0) {
throw new IllegalArgumentException(
"Invalid token content (offset / pageSize): " + offset + " / " + pageSize);
}

this.offset = offset;
this.pageSize = pageSize;
eric-maynard marked this conversation as resolved.
Show resolved Hide resolved
}

/** Construct a PageToken from a plain limit */
public static PageToken fromLimit(int limit) {
return new PageToken(TOKEN_START, limit);
}

/** Construct a PageToken to read everything */
public static PageToken readEverything() {
return new PageToken(TOKEN_START, Integer.MAX_VALUE);
}

/** Deserialize a token string into a PageToken object */
public static PageToken fromString(String tokenString) {
if (tokenString == null) {
return PageToken.readEverything();
eric-maynard marked this conversation as resolved.
Show resolved Hide resolved
} else if (tokenString.isEmpty()) {
return PageToken.fromLimit(DEFAULT_PAGE_SIZE);
} else {
try {
String decoded =
new String(Base64.getDecoder().decode(tokenString), StandardCharsets.UTF_8);
String[] parts = decoded.split(":");

if (parts.length != 4 || !parts[0].equals(TOKEN_PREFIX)) {
throw new IllegalArgumentException("Invalid token format in token: " + tokenString);
}

int offset = Integer.parseInt(parts[1]);
int pageSize = Integer.parseInt(parts[2]);
int checksum = Integer.parseInt(parts[3]);
PageToken token = new PageToken(offset, pageSize);

if (token.hashCode() != checksum) {
throw new IllegalArgumentException("Invalid checksum for token: " + tokenString);
eric-maynard marked this conversation as resolved.
Show resolved Hide resolved
} else {
return token;
}
} catch (Exception e) {
throw new IllegalArgumentException("Failed to decode page token: " + tokenString, e);
}
}
}

/**
* Builds a new page token to reflect new data that's been read. If the amount of data read is
* less than the pageSize, this will return `PageToken.DONE` (done)
*/
public PageToken updated(List<?> newData) {
if (newData == null || newData.isEmpty() || newData.size() < pageSize) {
eric-maynard marked this conversation as resolved.
Show resolved Hide resolved
return PageToken.DONE;
} else {
return new PageToken(offset + newData.size(), pageSize);
}
}

/**
* Builds a `PolarisPage<T>` from a `List<T>`. The `PageToken` attached to the new
* `PolarisPage<T>` is the same as the result of calling `updated(data)` on this `PageToken`.
*/
public <T> PolarisPage<T> buildNextPage(List<T> data) {
return new PolarisPage<T>(this.updated(data), data);
}

/**
* Return a new PageToken with an updated pageSize. If the pageSize provided is null, the existing
* pageSize will be preserved.
*/
public PageToken withPageSize(Integer pageSize) {
if (pageSize == null) {
eric-maynard marked this conversation as resolved.
Show resolved Hide resolved
return new PageToken(this.offset, this.pageSize);
} else {
return new PageToken(this.offset, pageSize);
}
}

/** Serialize a PageToken into a string */
@Override
public String toString() {
String tokenContent = TOKEN_PREFIX + ":" + offset + ":" + pageSize + ":" + hashCode();
return Base64.getEncoder().encodeToString(tokenContent.getBytes(StandardCharsets.UTF_8));
}

@Override
public boolean equals(Object o) {
if (o instanceof PageToken) {
PageToken other = (PageToken) o;
return offset == other.offset && pageSize == other.pageSize;
} else {
return false;
}
}

@Override
public int hashCode() {
return offset + pageSize;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.core.catalog;

import java.util.List;

/**
* A wrapper for a List of data and a PageToken that can be used to continue the listing operation
* that generated that data.
*/
public class PolarisPage<T> {
public final PageToken pageToken;
public final List<T> data;

public PolarisPage(PageToken pageToken, List<T> data) {
this.pageToken = pageToken;
this.data = data;
}

/** Used to wrap a List of data into a PolarisPage when there is no more data */
public static <T> PolarisPage<T> fromData(List<T> data) {
return new PolarisPage<>(PageToken.DONE, data);
}
}
Loading