From 0c2ba4b1330cd258cecd63e632e8a5643b9a3140 Mon Sep 17 00:00:00 2001 From: gargharsh3134 <51459091+gargharsh3134@users.noreply.github.com> Date: Mon, 7 Oct 2024 12:55:52 +0530 Subject: [PATCH] Implementing pagination for _cat/shards (#14641) * Adding _list/shards API Signed-off-by: Harsh Garg --- CHANGELOG.md | 1 + .../shards/TransportCatShardsActionIT.java | 5 +- .../org/opensearch/action/ActionModule.java | 2 + .../cluster/shards/CatShardsRequest.java | 39 +- .../cluster/shards/CatShardsResponse.java | 55 +- .../shards/TransportCatShardsAction.java | 37 +- .../indices/stats/IndicesStatsResponse.java | 2 +- .../pagination/IndexPaginationStrategy.java | 67 ++- .../action/pagination/PageParams.java | 82 +++ .../action/pagination/PageToken.java | 74 +++ .../pagination/PaginationStrategy.java | 10 +- .../pagination/ShardPaginationStrategy.java | 275 +++++++++ .../pagination/package-info.java | 2 +- .../java/org/opensearch/common/Table.java | 2 +- .../java/org/opensearch/rest/RestRequest.java | 8 +- .../rest/action/cat/RestIndicesAction.java | 4 +- .../rest/action/cat/RestShardsAction.java | 53 +- .../opensearch/rest/action/cat/RestTable.java | 2 +- .../rest/action/list/AbstractListAction.java | 6 +- .../action/list/RestIndicesListAction.java | 5 +- .../action/list/RestShardsListAction.java | 74 +++ .../rest/pagination/PageParams.java | 48 -- .../opensearch/rest/pagination/PageToken.java | 42 -- .../cluster/shards/CatShardsRequestTests.java | 112 ++++ .../shards/CatShardsResponseTests.java | 125 ++++- .../IndexPaginationStrategyTests.java | 6 +- .../action/pagination/PageParamsTests.java | 37 ++ .../action/pagination/PageTokenTests.java | 38 ++ .../ShardPaginationStrategyTests.java | 527 ++++++++++++++++++ .../action/cat/RestIndicesActionTests.java | 2 +- .../action/cat/RestShardsActionTests.java | 48 +- .../rest/action/cat/RestTableTests.java | 2 +- .../list/RestShardsListActionTests.java | 76 +++ .../opensearch/test/rest/FakeRestRequest.java | 9 + 34 files changed, 1691 insertions(+), 186 deletions(-) rename server/src/main/java/org/opensearch/{rest => action}/pagination/IndexPaginationStrategy.java (75%) create mode 100644 server/src/main/java/org/opensearch/action/pagination/PageParams.java create mode 100644 server/src/main/java/org/opensearch/action/pagination/PageToken.java rename server/src/main/java/org/opensearch/{rest => action}/pagination/PaginationStrategy.java (85%) create mode 100644 server/src/main/java/org/opensearch/action/pagination/ShardPaginationStrategy.java rename server/src/main/java/org/opensearch/{rest => action}/pagination/package-info.java (86%) create mode 100644 server/src/main/java/org/opensearch/rest/action/list/RestShardsListAction.java delete mode 100644 server/src/main/java/org/opensearch/rest/pagination/PageParams.java delete mode 100644 server/src/main/java/org/opensearch/rest/pagination/PageToken.java create mode 100644 server/src/test/java/org/opensearch/action/admin/cluster/shards/CatShardsRequestTests.java rename server/src/test/java/org/opensearch/{rest => action}/pagination/IndexPaginationStrategyTests.java (99%) create mode 100644 server/src/test/java/org/opensearch/action/pagination/PageParamsTests.java create mode 100644 server/src/test/java/org/opensearch/action/pagination/PageTokenTests.java create mode 100644 server/src/test/java/org/opensearch/action/pagination/ShardPaginationStrategyTests.java create mode 100644 server/src/test/java/org/opensearch/rest/action/list/RestShardsListActionTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a289a8c62000..cbf19fb0d35be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [S3 Repository] Change default retry mechanism of s3 clients to Standard Mode ([#15978](https://github.com/opensearch-project/OpenSearch/pull/15978)) - Add changes to block calls in cat shards, indices and segments based on dynamic limit settings ([#15986](https://github.com/opensearch-project/OpenSearch/pull/15986)) - New `phone` & `phone-search` analyzer + tokenizer ([#15915](https://github.com/opensearch-project/OpenSearch/pull/15915)) +- Add _list/shards API as paginated alternate to _cat/shards ([#14641](https://github.com/opensearch-project/OpenSearch/pull/14641)) ### Dependencies - Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578)) diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsActionIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsActionIT.java index b86521dedf739..32d5b3db85629 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsActionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsActionIT.java @@ -8,7 +8,6 @@ package org.opensearch.action.admin.cluster.shards; -import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.ShardRouting; @@ -51,9 +50,9 @@ public void testCatShardsWithSuccessResponse() throws InterruptedException { client().execute(CatShardsAction.INSTANCE, shardsRequest, new ActionListener() { @Override public void onResponse(CatShardsResponse catShardsResponse) { - ClusterStateResponse clusterStateResponse = catShardsResponse.getClusterStateResponse(); + List shardRoutings = catShardsResponse.getResponseShards(); IndicesStatsResponse indicesStatsResponse = catShardsResponse.getIndicesStatsResponse(); - for (ShardRouting shard : clusterStateResponse.getState().routingTable().allShards()) { + for (ShardRouting shard : shardRoutings) { assertEquals("test", shard.getIndexName()); assertNotNull(indicesStatsResponse.asMap().get(shard)); } diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 98bcd6ba9c3f8..d6b4a93b34217 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -465,6 +465,7 @@ import org.opensearch.rest.action.list.AbstractListAction; import org.opensearch.rest.action.list.RestIndicesListAction; import org.opensearch.rest.action.list.RestListAction; +import org.opensearch.rest.action.list.RestShardsListAction; import org.opensearch.rest.action.search.RestClearScrollAction; import org.opensearch.rest.action.search.RestCountAction; import org.opensearch.rest.action.search.RestCreatePitAction; @@ -993,6 +994,7 @@ public void initRestHandlers(Supplier nodesInCluster) { // LIST API registerHandler.accept(new RestIndicesListAction(responseLimitSettings)); + registerHandler.accept(new RestShardsListAction()); // Point in time API registerHandler.accept(new RestCreatePitAction()); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java index 76aa25b9a96b5..0319fa103138f 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java @@ -8,10 +8,13 @@ package org.opensearch.action.admin.cluster.shards; +import org.opensearch.Version; import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.pagination.PageParams; import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.tasks.TaskId; import org.opensearch.rest.action.admin.cluster.ClusterAdminTask; @@ -27,13 +30,39 @@ public class CatShardsRequest extends ClusterManagerNodeReadRequest responseShards = new ArrayList<>(); + private PageToken pageToken; public CatShardsResponse() {} public CatShardsResponse(StreamInput in) throws IOException { super(in); + indicesStatsResponse = new IndicesStatsResponse(in); + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + nodes = DiscoveryNodes.readFrom(in, null); + responseShards = in.readList(ShardRouting::new); + if (in.readBoolean()) { + pageToken = new PageToken(in); + } + } } @Override public void writeTo(StreamOutput out) throws IOException { - clusterStateResponse.writeTo(out); indicesStatsResponse.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + nodes.writeToWithAttribute(out); + out.writeList(responseShards); + out.writeBoolean(pageToken != null); + if (pageToken != null) { + pageToken.writeTo(out); + } + } } - public void setClusterStateResponse(ClusterStateResponse clusterStateResponse) { - this.clusterStateResponse = clusterStateResponse; + public void setNodes(DiscoveryNodes nodes) { + this.nodes = nodes; } - public ClusterStateResponse getClusterStateResponse() { - return this.clusterStateResponse; + public DiscoveryNodes getNodes() { + return this.nodes; } public void setIndicesStatsResponse(IndicesStatsResponse indicesStatsResponse) { @@ -54,4 +75,20 @@ public void setIndicesStatsResponse(IndicesStatsResponse indicesStatsResponse) { public IndicesStatsResponse getIndicesStatsResponse() { return this.indicesStatsResponse; } + + public void setResponseShards(List responseShards) { + this.responseShards = responseShards; + } + + public List getResponseShards() { + return this.responseShards; + } + + public void setPageToken(PageToken pageToken) { + this.pageToken = pageToken; + } + + public PageToken getPageToken() { + return this.pageToken; + } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java index 6b073a16f7d28..3dc8c38152a16 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java @@ -12,6 +12,8 @@ import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.pagination.PageParams; +import org.opensearch.action.pagination.ShardPaginationStrategy; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; import org.opensearch.action.support.TimeoutTaskCancellationUtility; @@ -57,7 +59,11 @@ public void doExecute(Task parentTask, CatShardsRequest shardsRequest, ActionLis clusterStateRequest.setShouldCancelOnTimeout(true); clusterStateRequest.local(shardsRequest.local()); clusterStateRequest.clusterManagerNodeTimeout(shardsRequest.clusterManagerNodeTimeout()); - clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices()); + if (Objects.isNull(shardsRequest.getPageParams())) { + clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices()); + } else { + clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices()).metadata(true); + } assert parentTask instanceof CancellableTask; clusterStateRequest.setParentTask(client.getLocalNodeId(), parentTask.getId()); @@ -87,13 +93,26 @@ protected void innerOnFailure(Exception e) { @Override public void onResponse(ClusterStateResponse clusterStateResponse) { validateRequestLimit(shardsRequest, clusterStateResponse, cancellableListener); - catShardsResponse.setClusterStateResponse(clusterStateResponse); - IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); - indicesStatsRequest.setShouldCancelOnTimeout(true); - indicesStatsRequest.all(); - indicesStatsRequest.indices(shardsRequest.getIndices()); - indicesStatsRequest.setParentTask(client.getLocalNodeId(), parentTask.getId()); try { + ShardPaginationStrategy paginationStrategy = getPaginationStrategy( + shardsRequest.getPageParams(), + clusterStateResponse + ); + String[] indices = Objects.isNull(paginationStrategy) + ? shardsRequest.getIndices() + : paginationStrategy.getRequestedIndices().toArray(new String[0]); + catShardsResponse.setNodes(clusterStateResponse.getState().getNodes()); + catShardsResponse.setResponseShards( + Objects.isNull(paginationStrategy) + ? clusterStateResponse.getState().routingTable().allShards() + : paginationStrategy.getRequestedEntities() + ); + catShardsResponse.setPageToken(Objects.isNull(paginationStrategy) ? null : paginationStrategy.getResponseToken()); + IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); + indicesStatsRequest.setShouldCancelOnTimeout(true); + indicesStatsRequest.all(); + indicesStatsRequest.indices(indices); + indicesStatsRequest.setParentTask(client.getLocalNodeId(), parentTask.getId()); client.admin().indices().stats(indicesStatsRequest, new ActionListener() { @Override public void onResponse(IndicesStatsResponse indicesStatsResponse) { @@ -122,6 +141,10 @@ public void onFailure(Exception e) { } + private ShardPaginationStrategy getPaginationStrategy(PageParams pageParams, ClusterStateResponse clusterStateResponse) { + return Objects.isNull(pageParams) ? null : new ShardPaginationStrategy(pageParams, clusterStateResponse.getState()); + } + private void validateRequestLimit( final CatShardsRequest shardsRequest, final ClusterStateResponse clusterStateResponse, diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsResponse.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsResponse.java index 900a886481fe6..ae989573b39ea 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsResponse.java @@ -64,7 +64,7 @@ public class IndicesStatsResponse extends BroadcastResponse { private Map shardStatsMap; - IndicesStatsResponse(StreamInput in) throws IOException { + public IndicesStatsResponse(StreamInput in) throws IOException { super(in); shards = in.readArray(ShardStats::new, (size) -> new ShardStats[size]); } diff --git a/server/src/main/java/org/opensearch/rest/pagination/IndexPaginationStrategy.java b/server/src/main/java/org/opensearch/action/pagination/IndexPaginationStrategy.java similarity index 75% rename from server/src/main/java/org/opensearch/rest/pagination/IndexPaginationStrategy.java rename to server/src/main/java/org/opensearch/action/pagination/IndexPaginationStrategy.java index f89ab14e4b24d..6180d098a851d 100644 --- a/server/src/main/java/org/opensearch/rest/pagination/IndexPaginationStrategy.java +++ b/server/src/main/java/org/opensearch/action/pagination/IndexPaginationStrategy.java @@ -6,12 +6,11 @@ * compatible open source license. */ -package org.opensearch.rest.pagination; +package org.opensearch.action.pagination; import org.opensearch.OpenSearchParseException; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.Nullable; import java.util.ArrayList; import java.util.Comparator; @@ -20,8 +19,6 @@ import java.util.function.Predicate; import java.util.stream.Collectors; -import static org.opensearch.rest.pagination.PageParams.PARAM_ASC_SORT_VALUE; - /** * This strategy can be used by the Rest APIs wanting to paginate the responses based on Indices. * The strategy considers create timestamps of indices as the keys to iterate over pages. @@ -31,13 +28,13 @@ public class IndexPaginationStrategy implements PaginationStrategy { private static final String DEFAULT_INDICES_PAGINATED_ENTITY = "indices"; - private static final Comparator ASC_COMPARATOR = (metadata1, metadata2) -> { + protected static final Comparator ASC_COMPARATOR = (metadata1, metadata2) -> { if (metadata1.getCreationDate() == metadata2.getCreationDate()) { return metadata1.getIndex().getName().compareTo(metadata2.getIndex().getName()); } return Long.compare(metadata1.getCreationDate(), metadata2.getCreationDate()); }; - private static final Comparator DESC_COMPARATOR = (metadata1, metadata2) -> { + protected static final Comparator DESC_COMPARATOR = (metadata1, metadata2) -> { if (metadata1.getCreationDate() == metadata2.getCreationDate()) { return metadata2.getIndex().getName().compareTo(metadata1.getIndex().getName()); } @@ -48,12 +45,18 @@ public class IndexPaginationStrategy implements PaginationStrategy { private final List requestedIndices; public IndexPaginationStrategy(PageParams pageParams, ClusterState clusterState) { + + IndexStrategyToken requestedToken = Objects.isNull(pageParams.getRequestedToken()) || pageParams.getRequestedToken().isEmpty() + ? null + : new IndexStrategyToken(pageParams.getRequestedToken()); // Get list of indices metadata sorted by their creation time and filtered by the last sent index - List sortedIndices = PaginationStrategy.getSortedIndexMetadata( + List sortedIndices = getEligibleIndices( clusterState, - getMetadataFilter(pageParams.getRequestedToken(), pageParams.getSort()), - PARAM_ASC_SORT_VALUE.equals(pageParams.getSort()) ? ASC_COMPARATOR : DESC_COMPARATOR + pageParams.getSort(), + Objects.isNull(requestedToken) ? null : requestedToken.lastIndexName, + Objects.isNull(requestedToken) ? null : requestedToken.lastIndexCreationTime ); + // Trim sortedIndicesList to get the list of indices metadata to be sent as response List metadataSublist = getMetadataSubList(sortedIndices, pageParams.getSize()); // Get list of index names from the trimmed metadataSublist @@ -65,25 +68,44 @@ public IndexPaginationStrategy(PageParams pageParams, ClusterState clusterState) ); } - private static Predicate getMetadataFilter(String requestedTokenStr, String sortOrder) { - boolean isAscendingSort = sortOrder.equals(PARAM_ASC_SORT_VALUE); - IndexStrategyToken requestedToken = Objects.isNull(requestedTokenStr) || requestedTokenStr.isEmpty() - ? null - : new IndexStrategyToken(requestedTokenStr); - if (Objects.isNull(requestedToken)) { + private static List getEligibleIndices( + ClusterState clusterState, + String sortOrder, + String lastIndexName, + Long lastIndexCreationTime + ) { + if (Objects.isNull(lastIndexName) || Objects.isNull(lastIndexCreationTime)) { + return PaginationStrategy.getSortedIndexMetadata( + clusterState, + PageParams.PARAM_ASC_SORT_VALUE.equals(sortOrder) ? ASC_COMPARATOR : DESC_COMPARATOR + ); + } else { + return PaginationStrategy.getSortedIndexMetadata( + clusterState, + getMetadataFilter(sortOrder, lastIndexName, lastIndexCreationTime), + PageParams.PARAM_ASC_SORT_VALUE.equals(sortOrder) ? ASC_COMPARATOR : DESC_COMPARATOR + ); + } + } + + private static Predicate getMetadataFilter(String sortOrder, String lastIndexName, Long lastIndexCreationTime) { + if (Objects.isNull(lastIndexName) || Objects.isNull(lastIndexCreationTime)) { return indexMetadata -> true; } + return getIndexCreateTimeFilter(sortOrder, lastIndexName, lastIndexCreationTime); + } + + protected static Predicate getIndexCreateTimeFilter(String sortOrder, String lastIndexName, Long lastIndexCreationTime) { + boolean isAscendingSort = sortOrder.equals(PageParams.PARAM_ASC_SORT_VALUE); return metadata -> { - if (metadata.getIndex().getName().equals(requestedToken.lastIndexName)) { - return false; - } else if (metadata.getCreationDate() == requestedToken.lastIndexCreationTime) { + if (metadata.getCreationDate() == lastIndexCreationTime) { return isAscendingSort - ? metadata.getIndex().getName().compareTo(requestedToken.lastIndexName) > 0 - : metadata.getIndex().getName().compareTo(requestedToken.lastIndexName) < 0; + ? metadata.getIndex().getName().compareTo(lastIndexName) > 0 + : metadata.getIndex().getName().compareTo(lastIndexName) < 0; } return isAscendingSort - ? metadata.getCreationDate() > requestedToken.lastIndexCreationTime - : metadata.getCreationDate() < requestedToken.lastIndexCreationTime; + ? metadata.getCreationDate() > lastIndexCreationTime + : metadata.getCreationDate() < lastIndexCreationTime; }; } @@ -105,7 +127,6 @@ private PageToken getResponseToken(final int pageSize, final int totalIndices, I } @Override - @Nullable public PageToken getResponseToken() { return pageToken; } diff --git a/server/src/main/java/org/opensearch/action/pagination/PageParams.java b/server/src/main/java/org/opensearch/action/pagination/PageParams.java new file mode 100644 index 0000000000000..03de1aa465a15 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/pagination/PageParams.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.pagination; + +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.Objects; + +/** + * + * Class specific to paginated queries, which will contain common query params required by a paginated API. + */ +@PublicApi(since = "3.0.0") +public class PageParams implements Writeable { + + public static final String PARAM_SORT = "sort"; + public static final String PARAM_NEXT_TOKEN = "next_token"; + public static final String PARAM_SIZE = "size"; + public static final String PARAM_ASC_SORT_VALUE = "asc"; + public static final String PARAM_DESC_SORT_VALUE = "desc"; + + private final String requestedTokenStr; + private final String sort; + private final int size; + + public PageParams(StreamInput in) throws IOException { + this.requestedTokenStr = in.readOptionalString(); + this.sort = in.readOptionalString(); + this.size = in.readInt(); + } + + public PageParams(String requestedToken, String sort, int size) { + this.requestedTokenStr = requestedToken; + this.sort = sort; + this.size = size; + } + + public String getSort() { + return sort; + } + + public String getRequestedToken() { + return requestedTokenStr; + } + + public int getSize() { + return size; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalString(requestedTokenStr); + out.writeOptionalString(sort); + out.writeInt(size); + } + + // Overriding equals and hashcode for tests + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PageParams that = (PageParams) o; + return this.size == that.size + && Objects.equals(this.requestedTokenStr, that.requestedTokenStr) + && Objects.equals(this.sort, that.sort); + } + + @Override + public int hashCode() { + return Objects.hash(requestedTokenStr, sort, size); + } +} diff --git a/server/src/main/java/org/opensearch/action/pagination/PageToken.java b/server/src/main/java/org/opensearch/action/pagination/PageToken.java new file mode 100644 index 0000000000000..44d4079681501 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/pagination/PageToken.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.pagination; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.Objects; + +/** + * Pagination response metadata for a paginated query. + * @opensearch.internal + */ +public class PageToken implements Writeable { + + public static final String PAGINATED_RESPONSE_NEXT_TOKEN_KEY = "next_token"; + + /** + * String denoting the next_token of paginated response, which will be used to fetch next page (if any). + */ + private final String nextToken; + + /** + * String denoting the element which is being paginated (for e.g. shards, indices..). + */ + private final String paginatedEntity; + + public PageToken(String nextToken, String paginatedElement) { + assert paginatedElement != null : "paginatedElement must be specified for a paginated response"; + this.nextToken = nextToken; + this.paginatedEntity = paginatedElement; + } + + public PageToken(StreamInput in) throws IOException { + this.nextToken = in.readOptionalString(); + this.paginatedEntity = in.readString(); + } + + public String getNextToken() { + return nextToken; + } + + public String getPaginatedEntity() { + return paginatedEntity; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalString(nextToken); + out.writeString(paginatedEntity); + } + + // Overriding equals and hashcode for testing + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PageToken that = (PageToken) o; + return Objects.equals(this.nextToken, that.nextToken) && Objects.equals(this.paginatedEntity, that.paginatedEntity); + } + + @Override + public int hashCode() { + return Objects.hash(nextToken, paginatedEntity); + } +} diff --git a/server/src/main/java/org/opensearch/rest/pagination/PaginationStrategy.java b/server/src/main/java/org/opensearch/action/pagination/PaginationStrategy.java similarity index 85% rename from server/src/main/java/org/opensearch/rest/pagination/PaginationStrategy.java rename to server/src/main/java/org/opensearch/action/pagination/PaginationStrategy.java index 7f9825a7cc09b..edb2489b1e4f0 100644 --- a/server/src/main/java/org/opensearch/rest/pagination/PaginationStrategy.java +++ b/server/src/main/java/org/opensearch/action/pagination/PaginationStrategy.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.rest.pagination; +package org.opensearch.action.pagination; import org.opensearch.OpenSearchParseException; import org.opensearch.cluster.ClusterState; @@ -55,6 +55,14 @@ static List getSortedIndexMetadata( return clusterState.metadata().indices().values().stream().filter(filterPredicate).sorted(comparator).collect(Collectors.toList()); } + /** + * + * Utility method to get list of indices sorted as per {@param comparator}. + */ + static List getSortedIndexMetadata(final ClusterState clusterState, Comparator comparator) { + return clusterState.metadata().indices().values().stream().sorted(comparator).collect(Collectors.toList()); + } + static String encryptStringToken(String tokenString) { if (Objects.isNull(tokenString)) { return null; diff --git a/server/src/main/java/org/opensearch/action/pagination/ShardPaginationStrategy.java b/server/src/main/java/org/opensearch/action/pagination/ShardPaginationStrategy.java new file mode 100644 index 0000000000000..1eb364c883e60 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/pagination/ShardPaginationStrategy.java @@ -0,0 +1,275 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.pagination; + +import org.opensearch.OpenSearchParseException; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Predicate; + +import static org.opensearch.action.pagination.IndexPaginationStrategy.ASC_COMPARATOR; +import static org.opensearch.action.pagination.IndexPaginationStrategy.DESC_COMPARATOR; + +/** + * This strategy can be used by the Rest APIs wanting to paginate the responses based on Shards. + * The strategy considers create timestamps of indices and shardID as the keys to iterate over pages. + * + * @opensearch.internal + */ +public class ShardPaginationStrategy implements PaginationStrategy { + + private static final String DEFAULT_SHARDS_PAGINATED_ENTITY = "shards"; + + private PageData pageData; + + public ShardPaginationStrategy(PageParams pageParams, ClusterState clusterState) { + ShardStrategyToken shardStrategyToken = getShardStrategyToken(pageParams.getRequestedToken()); + // Get list of indices metadata sorted by their creation time and filtered by the last sent index + List filteredIndices = getEligibleIndices( + clusterState, + pageParams.getSort(), + Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexName, + Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexCreationTime + ); + // Get the list of shards and indices belonging to current page. + this.pageData = getPageData( + filteredIndices, + clusterState.getRoutingTable().getIndicesRouting(), + shardStrategyToken, + pageParams.getSize() + ); + } + + private static List getEligibleIndices( + ClusterState clusterState, + String sortOrder, + String lastIndexName, + Long lastIndexCreationTime + ) { + if (Objects.isNull(lastIndexName) || Objects.isNull(lastIndexCreationTime)) { + return PaginationStrategy.getSortedIndexMetadata( + clusterState, + PageParams.PARAM_ASC_SORT_VALUE.equals(sortOrder) ? ASC_COMPARATOR : DESC_COMPARATOR + ); + } else { + return PaginationStrategy.getSortedIndexMetadata( + clusterState, + getMetadataFilter(sortOrder, lastIndexName, lastIndexCreationTime), + PageParams.PARAM_ASC_SORT_VALUE.equals(sortOrder) ? ASC_COMPARATOR : DESC_COMPARATOR + ); + } + } + + private static Predicate getMetadataFilter(String sortOrder, String lastIndexName, Long lastIndexCreationTime) { + if (Objects.isNull(lastIndexName) || Objects.isNull(lastIndexCreationTime)) { + return indexMetadata -> true; + } + return indexNameFilter(lastIndexName).or( + IndexPaginationStrategy.getIndexCreateTimeFilter(sortOrder, lastIndexName, lastIndexCreationTime) + ); + } + + private static Predicate indexNameFilter(String lastIndexName) { + return metadata -> metadata.getIndex().getName().equals(lastIndexName); + } + + /** + * Will be used to get the list of shards and respective indices to which they belong, + * which are to be displayed in a page. + * Note: All shards for a shardID will always be present in the same page. + */ + private PageData getPageData( + List filteredIndices, + Map indicesRouting, + final ShardStrategyToken token, + final int numShardsRequired + ) { + List shardRoutings = new ArrayList<>(); + List indices = new ArrayList<>(); + int shardCount = 0; + IndexMetadata lastAddedIndex = null; + + // iterate over indices until shardCount is less than numShardsRequired + for (IndexMetadata indexMetadata : filteredIndices) { + String indexName = indexMetadata.getIndex().getName(); + boolean indexShardsAdded = false; + // Always start from shardID 0 for all indices except for the first one which might be same as the last sent + // index. To identify if an index is same as last sent index, verify both the index name and creaton time. + int startShardId = shardCount == 0 ? getStartShardIdForPageIndex(token, indexName, indexMetadata.getCreationDate()) : 0; + Map indexShardRoutingTable = indicesRouting.get(indexName).getShards(); + for (; startShardId < indexShardRoutingTable.size(); startShardId++) { + if (indexShardRoutingTable.get(startShardId).size() > numShardsRequired) { + throw new IllegalArgumentException( + "size value should be greater than the replica count of all indices, so that all primary and replicas of a shard show up in single page" + ); + } + shardCount += indexShardRoutingTable.get(startShardId).size(); + if (shardCount > numShardsRequired) { + break; + } + shardRoutings.addAll(indexShardRoutingTable.get(startShardId).shards()); + indexShardsAdded = true; + } + + if (indexShardsAdded) { + lastAddedIndex = indexMetadata; + indices.add(indexName); + } + + if (shardCount > numShardsRequired) { + return new PageData( + shardRoutings, + indices, + new PageToken( + new ShardStrategyToken( + lastAddedIndex.getIndex().getName(), + shardRoutings.get(shardRoutings.size() - 1).id(), + lastAddedIndex.getCreationDate() + ).generateEncryptedToken(), + DEFAULT_SHARDS_PAGINATED_ENTITY + ) + ); + } + } + + // If all the shards of filtered indices list have been included in pageShardRoutings, then no more + // shards are remaining to be fetched, and the next_token should thus be null. + return new PageData(shardRoutings, indices, new PageToken(null, DEFAULT_SHARDS_PAGINATED_ENTITY)); + } + + private ShardStrategyToken getShardStrategyToken(String requestedToken) { + return Objects.isNull(requestedToken) || requestedToken.isEmpty() ? null : new ShardStrategyToken(requestedToken); + } + + /** + * Provides the shardId to start an index which is to be included in the page. If the index is same as + * lastIndex, start from the shardId next to lastShardId, else always start from 0. + */ + private int getStartShardIdForPageIndex(ShardStrategyToken token, final String pageIndexName, final long pageIndexCreationTime) { + return Objects.isNull(token) ? 0 + : token.lastIndexName.equals(pageIndexName) && token.lastIndexCreationTime == pageIndexCreationTime ? token.lastShardId + 1 + : 0; + } + + @Override + public PageToken getResponseToken() { + return pageData.pageToken; + } + + @Override + public List getRequestedEntities() { + return pageData.shardRoutings; + } + + public List getRequestedIndices() { + return pageData.indices; + } + + /** + * TokenParser to be used by {@link ShardPaginationStrategy}. + * TToken would look like: LastShardIdOfPage + | + CreationTimeOfLastRespondedIndex + | + NameOfLastRespondedIndex + */ + public static class ShardStrategyToken { + + private static final String JOIN_DELIMITER = "|"; + private static final String SPLIT_REGEX = "\\|"; + private static final int SHARD_ID_POS_IN_TOKEN = 0; + private static final int CREATE_TIME_POS_IN_TOKEN = 1; + private static final int INDEX_NAME_POS_IN_TOKEN = 2; + + /** + * Denotes the shardId of the last shard in the response. + * Will be used to identify the next shard to start the page from, in case the shards of an index + * get split across pages. + */ + private final int lastShardId; + /** + * Represents creation times of last index which was displayed in the page. + * Used to identify the new start point in case the indices get created/deleted while queries are executed. + */ + private final long lastIndexCreationTime; + + /** + * Represents name of the last index which was displayed in the page. + * Used to identify whether the sorted list of indices has changed or not. + */ + private final String lastIndexName; + + public ShardStrategyToken(String requestedTokenString) { + validateShardStrategyToken(requestedTokenString); + String decryptedToken = PaginationStrategy.decryptStringToken(requestedTokenString); + final String[] decryptedTokenElements = decryptedToken.split(SPLIT_REGEX); + this.lastShardId = Integer.parseInt(decryptedTokenElements[SHARD_ID_POS_IN_TOKEN]); + this.lastIndexCreationTime = Long.parseLong(decryptedTokenElements[CREATE_TIME_POS_IN_TOKEN]); + this.lastIndexName = decryptedTokenElements[INDEX_NAME_POS_IN_TOKEN]; + } + + public ShardStrategyToken(String lastIndexName, int lastShardId, long lastIndexCreationTime) { + Objects.requireNonNull(lastIndexName, "index name should be provided"); + this.lastIndexName = lastIndexName; + this.lastShardId = lastShardId; + this.lastIndexCreationTime = lastIndexCreationTime; + } + + public String generateEncryptedToken() { + return PaginationStrategy.encryptStringToken( + String.join(JOIN_DELIMITER, String.valueOf(lastShardId), String.valueOf(lastIndexCreationTime), lastIndexName) + ); + } + + /** + * Will perform simple validations on token received in the request. + * Token should be base64 encoded, and should contain the expected number of elements separated by "|". + * Timestamps should also be a valid long. + * + * @param requestedTokenStr string denoting the encoded token requested by the user. + */ + public static void validateShardStrategyToken(String requestedTokenStr) { + Objects.requireNonNull(requestedTokenStr, "requestedTokenString can not be null"); + String decryptedToken = PaginationStrategy.decryptStringToken(requestedTokenStr); + final String[] decryptedTokenElements = decryptedToken.split(SPLIT_REGEX); + if (decryptedTokenElements.length != 3) { + throw new OpenSearchParseException(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE); + } + try { + int shardId = Integer.parseInt(decryptedTokenElements[SHARD_ID_POS_IN_TOKEN]); + long creationTimeOfLastRespondedIndex = Long.parseLong(decryptedTokenElements[CREATE_TIME_POS_IN_TOKEN]); + if (shardId < 0 || creationTimeOfLastRespondedIndex <= 0) { + throw new OpenSearchParseException(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE); + } + } catch (NumberFormatException exception) { + throw new OpenSearchParseException(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE); + } + } + } + + /** + * Private utility class to consolidate the page details of shard strategy. Will be used to set all the details at once, + * to avoid parsing collections multiple times. + */ + private static class PageData { + private final List shardRoutings; + private final List indices; + private final PageToken pageToken; + + PageData(List shardRoutings, List indices, PageToken pageToken) { + this.shardRoutings = shardRoutings; + this.indices = indices; + this.pageToken = pageToken; + } + } +} diff --git a/server/src/main/java/org/opensearch/rest/pagination/package-info.java b/server/src/main/java/org/opensearch/action/pagination/package-info.java similarity index 86% rename from server/src/main/java/org/opensearch/rest/pagination/package-info.java rename to server/src/main/java/org/opensearch/action/pagination/package-info.java index 324b8a6c46f88..4cf4c7d05adaf 100644 --- a/server/src/main/java/org/opensearch/rest/pagination/package-info.java +++ b/server/src/main/java/org/opensearch/action/pagination/package-info.java @@ -9,4 +9,4 @@ /** * Exposes utilities for Rest actions to paginate responses. */ -package org.opensearch.rest.pagination; +package org.opensearch.action.pagination; diff --git a/server/src/main/java/org/opensearch/common/Table.java b/server/src/main/java/org/opensearch/common/Table.java index 133ec3052e6c9..c2dd770da845b 100644 --- a/server/src/main/java/org/opensearch/common/Table.java +++ b/server/src/main/java/org/opensearch/common/Table.java @@ -32,9 +32,9 @@ package org.opensearch.common; +import org.opensearch.action.pagination.PageToken; import org.opensearch.common.time.DateFormatter; import org.opensearch.core.common.Strings; -import org.opensearch.rest.pagination.PageToken; import java.time.Instant; import java.time.ZoneOffset; diff --git a/server/src/main/java/org/opensearch/rest/RestRequest.java b/server/src/main/java/org/opensearch/rest/RestRequest.java index f241b567c3204..4d5bad2e7fd49 100644 --- a/server/src/main/java/org/opensearch/rest/RestRequest.java +++ b/server/src/main/java/org/opensearch/rest/RestRequest.java @@ -33,6 +33,7 @@ package org.opensearch.rest; import org.opensearch.OpenSearchParseException; +import org.opensearch.action.pagination.PageParams; import org.opensearch.common.Booleans; import org.opensearch.common.CheckedConsumer; import org.opensearch.common.Nullable; @@ -51,7 +52,6 @@ import org.opensearch.core.xcontent.XContentParser; import org.opensearch.http.HttpChannel; import org.opensearch.http.HttpRequest; -import org.opensearch.rest.pagination.PageParams; import java.io.IOException; import java.io.InputStream; @@ -66,11 +66,11 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; +import static org.opensearch.action.pagination.PageParams.PARAM_NEXT_TOKEN; +import static org.opensearch.action.pagination.PageParams.PARAM_SIZE; +import static org.opensearch.action.pagination.PageParams.PARAM_SORT; import static org.opensearch.common.unit.TimeValue.parseTimeValue; import static org.opensearch.core.common.unit.ByteSizeValue.parseBytesSizeValue; -import static org.opensearch.rest.pagination.PageParams.PARAM_NEXT_TOKEN; -import static org.opensearch.rest.pagination.PageParams.PARAM_SIZE; -import static org.opensearch.rest.pagination.PageParams.PARAM_SORT; /** * REST Request diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java index 7562e09ddd704..b566ba9bbb8e4 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java @@ -43,6 +43,8 @@ import org.opensearch.action.admin.indices.stats.IndexStats; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.pagination.IndexPaginationStrategy; +import org.opensearch.action.pagination.PageToken; import org.opensearch.action.support.GroupedActionListener; import org.opensearch.action.support.IndicesOptions; import org.opensearch.client.node.NodeClient; @@ -65,8 +67,6 @@ import org.opensearch.rest.RestResponse; import org.opensearch.rest.action.RestResponseListener; import org.opensearch.rest.action.list.AbstractListAction; -import org.opensearch.rest.pagination.IndexPaginationStrategy; -import org.opensearch.rest.pagination.PageToken; import java.time.Instant; import java.time.ZoneOffset; diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java index c8b4e7472927e..04212b6b057fe 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java @@ -35,11 +35,12 @@ import org.opensearch.action.admin.cluster.shards.CatShardsAction; import org.opensearch.action.admin.cluster.shards.CatShardsRequest; import org.opensearch.action.admin.cluster.shards.CatShardsResponse; -import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.action.pagination.PageToken; import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.common.Table; @@ -63,6 +64,7 @@ import org.opensearch.rest.RestRequest; import org.opensearch.rest.RestResponse; import org.opensearch.rest.action.RestResponseListener; +import org.opensearch.rest.action.list.AbstractListAction; import org.opensearch.search.suggest.completion.CompletionStats; import java.time.Instant; @@ -80,7 +82,7 @@ * * @opensearch.api */ -public class RestShardsAction extends AbstractCatAction { +public class RestShardsAction extends AbstractListAction { private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestShardsAction.class); @@ -119,20 +121,32 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli shardsRequest.setCancelAfterTimeInterval(request.paramAsTime("cancel_after_time_interval", NO_TIMEOUT)); shardsRequest.setIndices(indices); shardsRequest.setRequestLimitCheckSupported(isRequestLimitCheckSupported()); + shardsRequest.setPageParams(pageParams); parseDeprecatedMasterTimeoutParameter(shardsRequest, request, deprecationLogger, getName()); return channel -> client.execute(CatShardsAction.INSTANCE, shardsRequest, new RestResponseListener(channel) { @Override public RestResponse buildResponse(CatShardsResponse catShardsResponse) throws Exception { - ClusterStateResponse clusterStateResponse = catShardsResponse.getClusterStateResponse(); - IndicesStatsResponse indicesStatsResponse = catShardsResponse.getIndicesStatsResponse(); - return RestTable.buildResponse(buildTable(request, clusterStateResponse, indicesStatsResponse), channel); + return RestTable.buildResponse( + buildTable( + request, + catShardsResponse.getNodes(), + catShardsResponse.getIndicesStatsResponse(), + catShardsResponse.getResponseShards(), + catShardsResponse.getPageToken() + ), + channel + ); } }); } @Override protected Table getTableWithHeader(final RestRequest request) { - Table table = new Table(); + return getTableWithHeader(request, null); + } + + protected Table getTableWithHeader(final RestRequest request, final PageToken pageToken) { + Table table = new Table(pageToken); table.startHeaders() .addCell("index", "default:true;alias:i,idx;desc:index name") .addCell("shard", "default:true;alias:s,sh;desc:shard name") @@ -301,10 +315,16 @@ private static Object getOrNull(S stats, Function accessor, Functio } // package private for testing - Table buildTable(RestRequest request, ClusterStateResponse state, IndicesStatsResponse stats) { - Table table = getTableWithHeader(request); - - for (ShardRouting shard : state.getState().routingTable().allShards()) { + Table buildTable( + RestRequest request, + DiscoveryNodes nodes, + IndicesStatsResponse stats, + List responseShards, + PageToken pageToken + ) { + Table table = getTableWithHeader(request, pageToken); + + for (ShardRouting shard : responseShards) { ShardStats shardStats = stats.asMap().get(shard); CommonStats commonStats = null; CommitStats commitStats = null; @@ -331,13 +351,13 @@ Table buildTable(RestRequest request, ClusterStateResponse state, IndicesStatsRe table.addCell(getOrNull(commonStats, CommonStats::getDocs, DocsStats::getCount)); table.addCell(getOrNull(commonStats, CommonStats::getStore, StoreStats::getSize)); if (shard.assignedToNode()) { - String ip = state.getState().nodes().get(shard.currentNodeId()).getHostAddress(); + String ip = nodes.get(shard.currentNodeId()).getHostAddress(); String nodeId = shard.currentNodeId(); StringBuilder name = new StringBuilder(); - name.append(state.getState().nodes().get(shard.currentNodeId()).getName()); + name.append(nodes.get(shard.currentNodeId()).getName()); if (shard.relocating()) { - String reloIp = state.getState().nodes().get(shard.relocatingNodeId()).getHostAddress(); - String reloNme = state.getState().nodes().get(shard.relocatingNodeId()).getName(); + String reloIp = nodes.get(shard.relocatingNodeId()).getHostAddress(); + String reloNme = nodes.get(shard.relocatingNodeId()).getName(); String reloNodeId = shard.relocatingNodeId(); name.append(" -> "); name.append(reloIp); @@ -460,4 +480,9 @@ Table buildTable(RestRequest request, ClusterStateResponse state, IndicesStatsRe return table; } + + @Override + public boolean isActionPaginated() { + return false; + } } diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestTable.java b/server/src/main/java/org/opensearch/rest/action/cat/RestTable.java index d622dd7a956f4..e27c8212e16a0 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestTable.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestTable.java @@ -61,7 +61,7 @@ import java.util.Objects; import java.util.Set; -import static org.opensearch.rest.pagination.PageToken.PAGINATED_RESPONSE_NEXT_TOKEN_KEY; +import static org.opensearch.action.pagination.PageToken.PAGINATED_RESPONSE_NEXT_TOKEN_KEY; /** * a REST table diff --git a/server/src/main/java/org/opensearch/rest/action/list/AbstractListAction.java b/server/src/main/java/org/opensearch/rest/action/list/AbstractListAction.java index f3d6d6653a550..5bdee750fd96f 100644 --- a/server/src/main/java/org/opensearch/rest/action/list/AbstractListAction.java +++ b/server/src/main/java/org/opensearch/rest/action/list/AbstractListAction.java @@ -8,16 +8,16 @@ package org.opensearch.rest.action.list; +import org.opensearch.action.pagination.PageParams; import org.opensearch.client.node.NodeClient; import org.opensearch.rest.RestRequest; import org.opensearch.rest.action.cat.AbstractCatAction; -import org.opensearch.rest.pagination.PageParams; import java.io.IOException; import java.util.Objects; -import static org.opensearch.rest.pagination.PageParams.PARAM_ASC_SORT_VALUE; -import static org.opensearch.rest.pagination.PageParams.PARAM_DESC_SORT_VALUE; +import static org.opensearch.action.pagination.PageParams.PARAM_ASC_SORT_VALUE; +import static org.opensearch.action.pagination.PageParams.PARAM_DESC_SORT_VALUE; /** * Base Transport action class for _list API. diff --git a/server/src/main/java/org/opensearch/rest/action/list/RestIndicesListAction.java b/server/src/main/java/org/opensearch/rest/action/list/RestIndicesListAction.java index 971cab0535be9..bd6a9fb09aafe 100644 --- a/server/src/main/java/org/opensearch/rest/action/list/RestIndicesListAction.java +++ b/server/src/main/java/org/opensearch/rest/action/list/RestIndicesListAction.java @@ -9,13 +9,13 @@ package org.opensearch.rest.action.list; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.pagination.IndexPaginationStrategy; +import org.opensearch.action.pagination.PageParams; import org.opensearch.common.breaker.ResponseLimitSettings; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; import org.opensearch.rest.RestRequest; import org.opensearch.rest.action.cat.RestIndicesAction; -import org.opensearch.rest.pagination.IndexPaginationStrategy; -import org.opensearch.rest.pagination.PageParams; import java.util.Iterator; import java.util.List; @@ -50,7 +50,6 @@ public String getName() { return "list_indices_action"; } - @Override protected void documentation(StringBuilder sb) { sb.append("/_list/indices\n"); sb.append("/_list/indices/{index}\n"); diff --git a/server/src/main/java/org/opensearch/rest/action/list/RestShardsListAction.java b/server/src/main/java/org/opensearch/rest/action/list/RestShardsListAction.java new file mode 100644 index 0000000000000..ba11592398174 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/list/RestShardsListAction.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.list; + +import org.opensearch.action.pagination.PageParams; +import org.opensearch.action.pagination.ShardPaginationStrategy; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.cat.RestShardsAction; + +import java.util.List; +import java.util.Objects; + +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; +import static org.opensearch.rest.RestRequest.Method.GET; + +/** + * _list API action to output shards in pages. + * + * @opensearch.api + */ +public class RestShardsListAction extends RestShardsAction { + + protected static final int MAX_SUPPORTED_LIST_SHARDS_PAGE_SIZE = 20000; + protected static final int MIN_SUPPORTED_LIST_SHARDS_PAGE_SIZE = 2000; + + @Override + public List routes() { + return unmodifiableList(asList(new Route(GET, "/_list/shards"), new Route(GET, "/_list/shards/{index}"))); + } + + @Override + public String getName() { + return "list_shards_action"; + } + + @Override + protected void documentation(StringBuilder sb) { + sb.append("/_list/shards\n"); + sb.append("/_list/shards/{index}\n"); + } + + @Override + public boolean isActionPaginated() { + return true; + } + + @Override + protected PageParams validateAndGetPageParams(RestRequest restRequest) { + PageParams pageParams = super.validateAndGetPageParams(restRequest); + // validate max supported pageSize + if (pageParams.getSize() < MIN_SUPPORTED_LIST_SHARDS_PAGE_SIZE) { + throw new IllegalArgumentException("size should at least be [" + MIN_SUPPORTED_LIST_SHARDS_PAGE_SIZE + "]"); + } else if (pageParams.getSize() > MAX_SUPPORTED_LIST_SHARDS_PAGE_SIZE) { + throw new IllegalArgumentException("size should be less than [" + MAX_SUPPORTED_LIST_SHARDS_PAGE_SIZE + "]"); + } + // Next Token in the request will be validated by the ShardStrategyToken itself. + if (Objects.nonNull(pageParams.getRequestedToken())) { + ShardPaginationStrategy.ShardStrategyToken.validateShardStrategyToken(pageParams.getRequestedToken()); + } + return pageParams; + } + + @Override + protected int defaultPageSize() { + return MIN_SUPPORTED_LIST_SHARDS_PAGE_SIZE; + } +} diff --git a/server/src/main/java/org/opensearch/rest/pagination/PageParams.java b/server/src/main/java/org/opensearch/rest/pagination/PageParams.java deleted file mode 100644 index 9b2074bc3fed0..0000000000000 --- a/server/src/main/java/org/opensearch/rest/pagination/PageParams.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.rest.pagination; - -import org.opensearch.common.annotation.PublicApi; - -/** - * - * Class specific to paginated queries, which will contain common query params required by a paginated API. - */ -@PublicApi(since = "3.0.0") -public class PageParams { - - public static final String PARAM_SORT = "sort"; - public static final String PARAM_NEXT_TOKEN = "next_token"; - public static final String PARAM_SIZE = "size"; - public static final String PARAM_ASC_SORT_VALUE = "asc"; - public static final String PARAM_DESC_SORT_VALUE = "desc"; - - private final String requestedTokenStr; - private final String sort; - private final int size; - - public PageParams(String requestedToken, String sort, int size) { - this.requestedTokenStr = requestedToken; - this.sort = sort; - this.size = size; - } - - public String getSort() { - return sort; - } - - public String getRequestedToken() { - return requestedTokenStr; - } - - public int getSize() { - return size; - } - -} diff --git a/server/src/main/java/org/opensearch/rest/pagination/PageToken.java b/server/src/main/java/org/opensearch/rest/pagination/PageToken.java deleted file mode 100644 index d62e1be695715..0000000000000 --- a/server/src/main/java/org/opensearch/rest/pagination/PageToken.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.rest.pagination; - -/** - * Pagination response metadata for a paginated query. - * @opensearch.internal - */ -public class PageToken { - - public static final String PAGINATED_RESPONSE_NEXT_TOKEN_KEY = "next_token"; - - /** - * String denoting the next_token of paginated response, which will be used to fetch next page (if any). - */ - private final String nextToken; - - /** - * String denoting the element which is being paginated (for e.g. shards, indices..). - */ - private final String paginatedEntity; - - public PageToken(String nextToken, String paginatedElement) { - assert paginatedElement != null : "paginatedElement must be specified for a paginated response"; - this.nextToken = nextToken; - this.paginatedEntity = paginatedElement; - } - - public String getNextToken() { - return nextToken; - } - - public String getPaginatedEntity() { - return paginatedEntity; - } -} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/shards/CatShardsRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/shards/CatShardsRequestTests.java new file mode 100644 index 0000000000000..f4215f54c1e21 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/shards/CatShardsRequestTests.java @@ -0,0 +1,112 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards; + +import org.opensearch.Version; +import org.opensearch.action.pagination.PageParams; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.VersionUtils; + +public class CatShardsRequestTests extends OpenSearchTestCase { + + public void testSerializationWithDefaultParameters() throws Exception { + CatShardsRequest request = new CatShardsRequest(); + Version version = Version.CURRENT; + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(version); + request.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + in.setVersion(version); + CatShardsRequest deserialized = new CatShardsRequest(in); + assertNull(deserialized.getPageParams()); + assertNull(deserialized.getCancelAfterTimeInterval()); + assertEquals(0, deserialized.getIndices().length); + assertFalse(deserialized.isRequestLimitCheckSupported()); + } + } + } + + public void testSerializationWithStringPageParamsNull() throws Exception { + CatShardsRequest catShardsRequest = new CatShardsRequest(); + catShardsRequest.setPageParams(new PageParams(null, null, randomIntBetween(1, 5))); + int numIndices = randomIntBetween(1, 5); + String[] indices = new String[numIndices]; + for (int i = 0; i < numIndices; i++) { + indices[i] = randomAlphaOfLengthBetween(3, 10); + } + catShardsRequest.setIndices(indices); + catShardsRequest.setCancelAfterTimeInterval(TimeValue.timeValueMillis(randomIntBetween(1, 5))); + catShardsRequest.setRequestLimitCheckSupported(true); + + Version version = Version.CURRENT; + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(version); + catShardsRequest.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + in.setVersion(version); + CatShardsRequest deserialized = new CatShardsRequest(in); + // asserting pageParams of deserialized request + assertEquals(catShardsRequest.getPageParams(), deserialized.getPageParams()); + // assert indices + assertArrayEquals(catShardsRequest.getIndices(), deserialized.getIndices()); + // assert timeout + assertEquals(catShardsRequest.getCancelAfterTimeInterval(), deserialized.getCancelAfterTimeInterval()); + assertTrue(deserialized.isRequestLimitCheckSupported()); + } + } + } + + public void testSerializationWithPageParamsSet() throws Exception { + CatShardsRequest catShardsRequest = new CatShardsRequest(); + catShardsRequest.setPageParams( + new PageParams(randomAlphaOfLengthBetween(3, 10), randomAlphaOfLengthBetween(3, 10), randomIntBetween(1, 5)) + ); + Version version = Version.CURRENT; + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(version); + catShardsRequest.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + in.setVersion(version); + CatShardsRequest deserialized = new CatShardsRequest(in); + + // asserting pageParams of deserialized request + assertEquals(catShardsRequest.getPageParams(), deserialized.getPageParams()); + assertEquals(0, deserialized.getIndices().length); + assertNull(deserialized.getCancelAfterTimeInterval()); + assertFalse(deserialized.isRequestLimitCheckSupported()); + } + } + } + + public void testSerializationWithOlderVersionsParametersNotSerialized() throws Exception { + CatShardsRequest catShardsRequest = new CatShardsRequest(); + catShardsRequest.setPageParams( + new PageParams(randomAlphaOfLengthBetween(3, 10), randomAlphaOfLengthBetween(3, 10), randomIntBetween(1, 5)) + ); + catShardsRequest.setCancelAfterTimeInterval(TimeValue.timeValueMillis(randomIntBetween(1, 5))); + catShardsRequest.setIndices(new String[2]); + + Version version = VersionUtils.getPreviousVersion(Version.CURRENT); + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(version); + catShardsRequest.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + in.setVersion(version); + CatShardsRequest deserialized = new CatShardsRequest(in); + assertNull(deserialized.getPageParams()); + assertNull(deserialized.getIndices()); + assertNull(deserialized.getCancelAfterTimeInterval()); + assertFalse(deserialized.isRequestLimitCheckSupported()); + } + } + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/indices/shards/CatShardsResponseTests.java b/server/src/test/java/org/opensearch/action/admin/indices/shards/CatShardsResponseTests.java index d0b98f4d286ae..11b1d5567d9fb 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/shards/CatShardsResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/shards/CatShardsResponseTests.java @@ -32,24 +32,46 @@ package org.opensearch.action.admin.indices.shards; +import org.opensearch.Version; import org.opensearch.action.admin.cluster.shards.CatShardsResponse; -import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; -import org.opensearch.cluster.ClusterName; +import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.action.pagination.PageToken; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.common.UUIDs; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.shard.ShardPath; import org.opensearch.test.OpenSearchTestCase; -import static org.junit.Assert.assertEquals; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.mockito.Mockito.mock; public class CatShardsResponseTests extends OpenSearchTestCase { + private static final String TEST_NODE_ID = "test_node_id"; private final CatShardsResponse catShardsResponse = new CatShardsResponse(); - public void testGetAndSetClusterStateResponse() { - ClusterName clusterName = new ClusterName("1"); - ClusterStateResponse clusterStateResponse = new ClusterStateResponse(clusterName, null, false); - catShardsResponse.setClusterStateResponse(clusterStateResponse); - - assertEquals(clusterStateResponse, catShardsResponse.getClusterStateResponse()); + public void testGetAndSetNodes() { + DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class); + catShardsResponse.setNodes(discoveryNodes); + assertEquals(discoveryNodes, catShardsResponse.getNodes()); } public void testGetAndSetIndicesStatsResponse() { @@ -58,4 +80,89 @@ public void testGetAndSetIndicesStatsResponse() { assertEquals(indicesStatsResponse, catShardsResponse.getIndicesStatsResponse()); } + + public void testSerialization() throws IOException { + CatShardsResponse response = new CatShardsResponse(); + response.setIndicesStatsResponse(getIndicesStatsResponse()); + response.setNodes(getDiscoveryNodes()); + response.setPageToken(new PageToken("token", "test")); + Version version = Version.CURRENT; + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(version); + response.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + in.setVersion(version); + CatShardsResponse deserialized = new CatShardsResponse(in); + assertEquals(response.getPageToken(), deserialized.getPageToken()); + assertNotNull(deserialized.getNodes()); + assertEquals(1, deserialized.getNodes().getNodes().size()); + assertNotNull(deserialized.getNodes().getNodes().get(TEST_NODE_ID)); + assertNotNull(deserialized.getIndicesStatsResponse()); + assertEquals( + response.getIndicesStatsResponse().getShards().length, + deserialized.getIndicesStatsResponse().getShards().length + ); + assertTrue(deserialized.getResponseShards().isEmpty()); + } + } + } + + public void testSerializationWithOnlyStatsAndNodesInResponse() throws IOException { + CatShardsResponse response = new CatShardsResponse(); + response.setIndicesStatsResponse(getIndicesStatsResponse()); + response.setNodes(getDiscoveryNodes()); + Version version = Version.CURRENT; + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(version); + response.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + in.setVersion(version); + CatShardsResponse deserialized = new CatShardsResponse(in); + assertEquals(response.getPageToken(), deserialized.getPageToken()); + assertNotNull(deserialized.getNodes()); + assertEquals(1, deserialized.getNodes().getNodes().size()); + assertNotNull(deserialized.getNodes().getNodes().get(TEST_NODE_ID)); + assertNotNull(deserialized.getIndicesStatsResponse()); + assertEquals( + response.getIndicesStatsResponse().getShards().length, + deserialized.getIndicesStatsResponse().getShards().length + ); + assertTrue(deserialized.getResponseShards().isEmpty()); + } + } + } + + private DiscoveryNodes getDiscoveryNodes() throws UnknownHostException { + DiscoveryNodes.Builder dnBuilder = new DiscoveryNodes.Builder(); + InetAddress inetAddress = InetAddress.getByAddress(new byte[] { (byte) 192, (byte) 168, (byte) 0, (byte) 1 }); + TransportAddress transportAddress = new TransportAddress(inetAddress, randomIntBetween(0, 65535)); + dnBuilder.add(new DiscoveryNode(TEST_NODE_ID, TEST_NODE_ID, transportAddress, emptyMap(), emptySet(), Version.CURRENT)); + return dnBuilder.build(); + } + + private IndicesStatsResponse getIndicesStatsResponse() { + List shards = new ArrayList<>(); + int noOfIndexes = randomIntBetween(2, 5); + + for (int indCnt = 0; indCnt < noOfIndexes; indCnt++) { + Index index = createIndex(randomAlphaOfLength(9)); + int numShards = randomIntBetween(1, 5); + for (int shardId = 0; shardId < numShards; shardId++) { + ShardId shId = new ShardId(index, shardId); + Path path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve(String.valueOf(shardId)); + ShardPath shardPath = new ShardPath(false, path, path, shId); + ShardRouting routing = createShardRouting(shId, (shardId == 0)); + shards.add(new ShardStats(routing, shardPath, new CommonStats(), null, null, null)); + } + } + return new IndicesStatsResponse(shards.toArray(new ShardStats[0]), 0, 0, 0, null); + } + + private ShardRouting createShardRouting(ShardId shardId, boolean isPrimary) { + return TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(4), isPrimary, ShardRoutingState.STARTED); + } + + private Index createIndex(String indexName) { + return new Index(indexName, UUIDs.base64UUID()); + } } diff --git a/server/src/test/java/org/opensearch/rest/pagination/IndexPaginationStrategyTests.java b/server/src/test/java/org/opensearch/action/pagination/IndexPaginationStrategyTests.java similarity index 99% rename from server/src/test/java/org/opensearch/rest/pagination/IndexPaginationStrategyTests.java rename to server/src/test/java/org/opensearch/action/pagination/IndexPaginationStrategyTests.java index 01464b489e26e..9900810bf3eb9 100644 --- a/server/src/test/java/org/opensearch/rest/pagination/IndexPaginationStrategyTests.java +++ b/server/src/test/java/org/opensearch/action/pagination/IndexPaginationStrategyTests.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.rest.pagination; +package org.opensearch.action.pagination; import org.opensearch.OpenSearchParseException; import org.opensearch.Version; @@ -27,9 +27,9 @@ import java.util.Objects; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.opensearch.action.pagination.PageParams.PARAM_ASC_SORT_VALUE; +import static org.opensearch.action.pagination.PageParams.PARAM_DESC_SORT_VALUE; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; -import static org.opensearch.rest.pagination.PageParams.PARAM_ASC_SORT_VALUE; -import static org.opensearch.rest.pagination.PageParams.PARAM_DESC_SORT_VALUE; import static com.carrotsearch.randomizedtesting.RandomizedTest.getRandom; public class IndexPaginationStrategyTests extends OpenSearchTestCase { diff --git a/server/src/test/java/org/opensearch/action/pagination/PageParamsTests.java b/server/src/test/java/org/opensearch/action/pagination/PageParamsTests.java new file mode 100644 index 0000000000000..66459ad473a49 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/pagination/PageParamsTests.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.pagination; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; + +public class PageParamsTests extends OpenSearchTestCase { + + public void testSerialization() throws Exception { + PageParams pageParams = new PageParams("foo", "foo", 1); + try (BytesStreamOutput out = new BytesStreamOutput()) { + pageParams.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + assertEquals(pageParams, new PageParams(in)); + } + } + } + + public void testSerializationWithRequestedTokenAndSortAbsent() throws Exception { + PageParams pageParams = new PageParams(null, null, 1); + try (BytesStreamOutput out = new BytesStreamOutput()) { + pageParams.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + assertEquals(pageParams, new PageParams(in)); + } + } + } + +} diff --git a/server/src/test/java/org/opensearch/action/pagination/PageTokenTests.java b/server/src/test/java/org/opensearch/action/pagination/PageTokenTests.java new file mode 100644 index 0000000000000..7c24092dea64c --- /dev/null +++ b/server/src/test/java/org/opensearch/action/pagination/PageTokenTests.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.pagination; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; + +public class PageTokenTests extends OpenSearchTestCase { + + public void testSerialization() throws Exception { + PageToken pageToken = new PageToken("foo", "test"); + try (BytesStreamOutput out = new BytesStreamOutput()) { + pageToken.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + PageToken deserialized = new PageToken(in); + assertEquals(pageToken, deserialized); + } + } + } + + public void testSerializationWithNextTokenAbsent() throws Exception { + PageToken pageToken = new PageToken(null, "test"); + try (BytesStreamOutput out = new BytesStreamOutput()) { + pageToken.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + PageToken deserialized = new PageToken(in); + assertEquals(pageToken, deserialized); + } + } + } +} diff --git a/server/src/test/java/org/opensearch/action/pagination/ShardPaginationStrategyTests.java b/server/src/test/java/org/opensearch/action/pagination/ShardPaginationStrategyTests.java new file mode 100644 index 0000000000000..aed7315660378 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/pagination/ShardPaginationStrategyTests.java @@ -0,0 +1,527 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.pagination; + +import org.opensearch.OpenSearchParseException; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.test.OpenSearchTestCase; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.opensearch.action.pagination.PageParams.PARAM_ASC_SORT_VALUE; +import static org.opensearch.action.pagination.PageParams.PARAM_DESC_SORT_VALUE; +import static org.opensearch.action.pagination.PaginationStrategy.INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; +import static com.carrotsearch.randomizedtesting.RandomizedTest.getRandom; + +public class ShardPaginationStrategyTests extends OpenSearchTestCase { + + private static final String TEST_INDEX_PREFIX = "test-index-"; + private static final int DEFAULT_NUMBER_OF_SHARDS = 5; + private static final int DEFAULT_NUMBER_OF_REPLICAS = 2; + + /** + * Test validates fetching all the shards for 100 indices in a paginated fashion using {@link ShardPaginationStrategy}, + * while no indices are created or deleted during the fetch. + */ + public void testRetrieveAllShardsWithVaryingPageSize() { + List indexNumberList = new ArrayList<>(); + final int totalIndices = 100; + final int totalShards = totalIndices * DEFAULT_NUMBER_OF_SHARDS * (DEFAULT_NUMBER_OF_REPLICAS + 1); + for (int indexNumber = 1; indexNumber <= 100; indexNumber++) { + indexNumberList.add(indexNumber); + } + // creating a cluster state with 100 indices + Collections.shuffle(indexNumberList, getRandom()); + ClusterState clusterState = getRandomClusterState(indexNumberList); + + // Checking pagination response for different pageSizes, which has a mix of even and odd numbers + // to ensure number of shards in last page is not always equal to pageSize. + List pageSizeList = List.of(3, 7, 10, 13); + + List sortOrderList = List.of(PARAM_ASC_SORT_VALUE, PARAM_DESC_SORT_VALUE); + for (String sortOrder : sortOrderList) { + for (int pageSize : pageSizeList) { + List shardRoutings = new ArrayList<>(); + Set indices = new HashSet<>(); + String requestedToken = null; + // Since each shardID will have number of (replicas + 1) shards, thus number + // of shards in a page should always be a multiple of (replicas + 1). + int totalPagesToFetch = (int) Math.ceil(totalShards * 1.0 / (pageSize - pageSize % (DEFAULT_NUMBER_OF_REPLICAS + 1))); + + long lastPageIndexTime = 0; + for (int pageNum = 1; pageNum <= totalPagesToFetch; pageNum++) { + PageParams pageParams = new PageParams(requestedToken, sortOrder, pageSize); + ShardPaginationStrategy strategy = new ShardPaginationStrategy(pageParams, clusterState); + List pageShards = strategy.getRequestedEntities(); + List pageIndices = strategy.getRequestedIndices(); + if (pageNum < totalPagesToFetch) { + assertNotNull(strategy.getResponseToken().getNextToken()); + } else { + assertNull(strategy.getResponseToken().getNextToken()); + } + shardRoutings.addAll(pageShards); + indices.addAll(pageIndices); + long currentLastIndexTime = clusterState.metadata() + .indices() + .get(pageShards.get(pageShards.size() - 1).getIndexName()) + .getCreationDate(); + if (lastPageIndexTime > 0) { + if (sortOrder.equals(PARAM_ASC_SORT_VALUE)) { + assertTrue(lastPageIndexTime <= currentLastIndexTime); + } else { + assertTrue(lastPageIndexTime >= currentLastIndexTime); + } + } + lastPageIndexTime = currentLastIndexTime; + requestedToken = strategy.getResponseToken().getNextToken(); + } + assertEquals(totalShards, shardRoutings.size()); + assertEquals(totalIndices, indices.size()); + + // verifying the order of shards in the response. + verifyOrderForAllShards(clusterState, shardRoutings, sortOrder); + } + } + } + + /** + * Test validates fetching all the shards (in asc order) for 100 indices in a paginated fashion using {@link ShardPaginationStrategy}. + * While executing queries, it tries to delete an index for which some of the shards have already been fetched along + * with some other indices which are yet to be fetched. It also creates new indices in between the queries. + * Shards corresponding to indices deleted, should not be fetched while for the indices which were newly created, + * should appear in the response. + */ + public void testRetrieveAllShardsInAscOrderWhileIndicesGetCreatedAndDeleted() { + List indexNumberList = new ArrayList<>(); + List deletedIndices = new ArrayList<>(); + List newIndices = new ArrayList<>(); + final int totalIndices = 100; + final int numIndicesToDelete = 10; + final int numIndicesToCreate = 5; + final int fixedIndexNumToDelete = 7; + final int numShardsForNewIndices = 4; + final int numReplicasForNewIndices = 5; + final int totalInitialShards = totalIndices * DEFAULT_NUMBER_OF_SHARDS * (DEFAULT_NUMBER_OF_REPLICAS + 1); + for (int indexNumber = 1; indexNumber <= 100; indexNumber++) { + indexNumberList.add(indexNumber); + } + ClusterState clusterState = getRandomClusterState(indexNumberList); + + int pageSize = 10; + String requestedToken = null; + int numPages = 0; + List shardRoutings = new ArrayList<>(); + Set indicesFetched = new HashSet<>(); + do { + numPages++; + PageParams pageParams = new PageParams(requestedToken, PARAM_ASC_SORT_VALUE, pageSize); + ShardPaginationStrategy paginationStrategy = new ShardPaginationStrategy(pageParams, clusterState); + assertNotNull(paginationStrategy); + assertNotNull(paginationStrategy.getResponseToken()); + requestedToken = paginationStrategy.getResponseToken().getNextToken(); + // deleting test-index-7 & 10 more random indices after 11th call. By that time, shards for first 6 + // indices would have been completely fetched. 11th call, would fetch first 3 shardsIDs for test-index-7 and + // if it gets deleted, rest 2 shardIDs for it should not appear. + if (numPages == 11) { + // asserting last elements of current response list, which should be the last shard of test-index-6 + assertEquals(TEST_INDEX_PREFIX + 6, shardRoutings.get(shardRoutings.size() - 1).getIndexName()); + assertEquals(DEFAULT_NUMBER_OF_SHARDS - 1, shardRoutings.get(shardRoutings.size() - 1).getId()); + + // asserting the result of 11th query, which should only contain 3 shardIDs belonging to test-index-7 + assertEquals(3 * (DEFAULT_NUMBER_OF_REPLICAS + 1), paginationStrategy.getRequestedEntities().size()); + assertEquals(1, paginationStrategy.getRequestedIndices().size()); + assertEquals(TEST_INDEX_PREFIX + 7, paginationStrategy.getRequestedIndices().get(0)); + assertEquals(2, paginationStrategy.getRequestedEntities().get(8).id()); + + clusterState = deleteIndexFromClusterState(clusterState, fixedIndexNumToDelete); + deletedIndices = indexNumberList.subList(20, indexNumberList.size()); + Collections.shuffle(deletedIndices, getRandom()); + deletedIndices = deletedIndices.subList(0, numIndicesToDelete); + for (Integer index : deletedIndices) { + clusterState = deleteIndexFromClusterState(clusterState, index); + } + } + // creating 5 indices after 5th call + if (numPages == 5) { + for (int indexNumber = totalIndices + 1; indexNumber <= totalIndices + numIndicesToCreate; indexNumber++) { + newIndices.add(TEST_INDEX_PREFIX + indexNumber); + clusterState = addIndexToClusterState(clusterState, indexNumber, numShardsForNewIndices, numReplicasForNewIndices); + } + } + assertTrue(paginationStrategy.getRequestedEntities().size() <= pageSize); + shardRoutings.addAll(paginationStrategy.getRequestedEntities()); + indicesFetched.addAll(paginationStrategy.getRequestedIndices()); + } while (Objects.nonNull(requestedToken)); + + assertEquals(totalIndices + numIndicesToCreate - numIndicesToDelete, indicesFetched.size()); + // finalTotalShards = InitialTotal - 2ShardIdsForIndex7 - ShardsFor10RandomlyDeletedIndices + ShardsForNewlyCreatedIndices + final int totalShards = totalInitialShards - 2 * (DEFAULT_NUMBER_OF_REPLICAS + 1) - numIndicesToDelete * DEFAULT_NUMBER_OF_SHARDS + * (DEFAULT_NUMBER_OF_REPLICAS + 1) + numIndicesToCreate * numShardsForNewIndices * (numReplicasForNewIndices + 1); + assertEquals(totalShards, shardRoutings.size()); + + // deleted test-index-7, should appear in the response shards and indices + assertTrue(indicesFetched.contains(TEST_INDEX_PREFIX + 7)); + assertEquals( + shardRoutings.stream().filter(shard -> shard.getIndexName().equals(TEST_INDEX_PREFIX + 7)).count(), + 3 * (DEFAULT_NUMBER_OF_REPLICAS + 1) + ); + + // none of the randomly deleted index should appear in the list of fetched indices + for (Integer index : deletedIndices) { + String indexName = TEST_INDEX_PREFIX + index; + assertFalse(indicesFetched.contains(indexName)); + assertEquals(shardRoutings.stream().filter(shard -> shard.getIndexName().equals(indexName)).count(), 0); + } + + // all the newly created indices should be present in the list of fetched indices + verifyOrderForAllShards( + clusterState, + shardRoutings.stream().filter(shard -> newIndices.contains(shard.getIndexName())).collect(Collectors.toList()), + PARAM_ASC_SORT_VALUE + ); + for (int indexNumber = totalIndices + 1; indexNumber <= totalIndices + numIndicesToCreate; indexNumber++) { + String indexName = TEST_INDEX_PREFIX + indexNumber; + assertTrue(indicesFetched.contains(indexName)); + assertEquals( + numShardsForNewIndices * (numReplicasForNewIndices + 1), + shardRoutings.stream().filter(shard -> shard.getIndexName().equals(indexName)).count() + ); + } + } + + /** + * Test validates fetching all the shards (in desc order) for 100 indices in a paginated fashion using {@link ShardPaginationStrategy}. + * While executing queries, it tries to delete an index for which some of the shards have already been fetched along + * with some other indices which are yet to be fetched. It also creates new indices in between the queries. + * Shards corresponding to indices deleted, should not be fetched while for the indices which were newly created, + * should appear in the response. + */ + public void testRetrieveAllShardsInDescOrderWhileIndicesGetCreatedAndDeleted() { + List indexNumberList = new ArrayList<>(); + List deletedIndices = new ArrayList<>(); + final int totalIndices = 100; + final int numIndicesToDelete = 10; + final int numIndicesToCreate = 5; + final int fixedIndexNumToDelete = 94; + final int numShardsForNewIndices = 4; + final int numReplicasForNewIndices = 5; + final int totalInitialShards = totalIndices * DEFAULT_NUMBER_OF_SHARDS * (DEFAULT_NUMBER_OF_REPLICAS + 1); + for (int indexNumber = 1; indexNumber <= 100; indexNumber++) { + indexNumberList.add(indexNumber); + } + ClusterState clusterState = getRandomClusterState(indexNumberList); + + int pageSize = 10; + String requestedToken = null; + int numPages = 0; + List shardRoutings = new ArrayList<>(); + Set indicesFetched = new HashSet<>(); + do { + PageParams pageParams = new PageParams(requestedToken, PARAM_DESC_SORT_VALUE, pageSize); + ShardPaginationStrategy paginationStrategy = new ShardPaginationStrategy(pageParams, clusterState); + numPages++; + assertNotNull(paginationStrategy); + assertNotNull(paginationStrategy.getResponseToken()); + requestedToken = paginationStrategy.getResponseToken().getNextToken(); + // deleting test-index-94 & 10 more random indices after 11th call. By that time, shards for last 6 + // indices would have been completely fetched. 11th call, would fetch first 3 shardsIDs for test-index-94 and + // if it gets deleted, rest 2 shardIDs for it should not appear. + if (numPages == 11) { + // asserting last elements of current response list, which should be the last shard of test-index-95 + assertEquals(TEST_INDEX_PREFIX + 95, shardRoutings.get(shardRoutings.size() - 1).getIndexName()); + assertEquals(DEFAULT_NUMBER_OF_SHARDS - 1, shardRoutings.get(shardRoutings.size() - 1).getId()); + + // asserting the result of 11th query, which should only contain 3 shardIDs belonging to test-index-7 + assertEquals(3 * (DEFAULT_NUMBER_OF_REPLICAS + 1), paginationStrategy.getRequestedEntities().size()); + assertEquals(1, paginationStrategy.getRequestedIndices().size()); + assertEquals(TEST_INDEX_PREFIX + 94, paginationStrategy.getRequestedIndices().get(0)); + assertEquals(2, paginationStrategy.getRequestedEntities().get(8).id()); + + clusterState = deleteIndexFromClusterState(clusterState, fixedIndexNumToDelete); + deletedIndices = indexNumberList.subList(0, 80); + Collections.shuffle(deletedIndices, getRandom()); + deletedIndices = deletedIndices.subList(0, numIndicesToDelete); + for (Integer index : deletedIndices) { + clusterState = deleteIndexFromClusterState(clusterState, index); + } + } + // creating 5 indices after 5th call + if (numPages == 5) { + for (int indexNumber = totalIndices + 1; indexNumber <= totalIndices + numIndicesToCreate; indexNumber++) { + clusterState = addIndexToClusterState(clusterState, indexNumber, numShardsForNewIndices, numReplicasForNewIndices); + } + } + assertTrue(paginationStrategy.getRequestedEntities().size() <= pageSize); + shardRoutings.addAll(paginationStrategy.getRequestedEntities()); + indicesFetched.addAll(paginationStrategy.getRequestedIndices()); + } while (Objects.nonNull(requestedToken)); + assertEquals(totalIndices - numIndicesToDelete, indicesFetched.size()); + // finalTotalShards = InitialTotal - 2ShardIdsForIndex7 - ShardsFor10RandomlyDeletedIndices + final int totalShards = totalInitialShards - 2 * (DEFAULT_NUMBER_OF_REPLICAS + 1) - numIndicesToDelete * DEFAULT_NUMBER_OF_SHARDS + * (DEFAULT_NUMBER_OF_REPLICAS + 1); + assertEquals(totalShards, shardRoutings.size()); + + // deleted test-index-94, should appear in the response shards and indices + assertTrue(indicesFetched.contains(TEST_INDEX_PREFIX + fixedIndexNumToDelete)); + assertEquals( + shardRoutings.stream().filter(shard -> shard.getIndexName().equals(TEST_INDEX_PREFIX + fixedIndexNumToDelete)).count(), + 3 * (DEFAULT_NUMBER_OF_REPLICAS + 1) + ); + + // none of the randomly deleted index should appear in the list of fetched indices + for (int deletedIndex : deletedIndices) { + String indexName = TEST_INDEX_PREFIX + deletedIndex; + assertFalse(indicesFetched.contains(indexName)); + assertEquals(shardRoutings.stream().filter(shard -> shard.getIndexName().equals(indexName)).count(), 0); + } + + // none of the newly created indices should be present in the list of fetched indices + for (int indexNumber = totalIndices + 1; indexNumber <= totalIndices + numIndicesToCreate; indexNumber++) { + String indexName = TEST_INDEX_PREFIX + indexNumber; + assertFalse(indicesFetched.contains(indexName)); + assertEquals(0, shardRoutings.stream().filter(shard -> shard.getIndexName().equals(indexName)).count()); + } + } + + /** + * Validates strategy fails with IllegalArgumentException when requests size in pageParam is smaller + * than #(replicas + 1) for any of the index. + */ + public void testIllegalSizeArgumentRequestedFromStrategy() { + int numIndices = 6; + int numShards = 5; + int numReplicas = 8; + int pageSize = numReplicas + 1; + ClusterState clusterState = getRandomClusterState(Collections.emptyList()); + for (int index = 1; index < numIndices; index++) { + clusterState = addIndexToClusterState(clusterState, index, numShards, numReplicas); + } + clusterState = addIndexToClusterState(clusterState, numIndices, numShards + 1, numReplicas + 1); + + try { + String requestedToken = null; + ShardPaginationStrategy strategy; + do { + PageParams pageParams = new PageParams(requestedToken, PARAM_ASC_SORT_VALUE, pageSize); + strategy = new ShardPaginationStrategy(pageParams, clusterState); + requestedToken = strategy.getResponseToken().getNextToken(); + } while (requestedToken != null); + fail("expected exception"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("size value should be greater than the replica count of all indices")); + } + } + + public void testRetrieveShardsWhenLastIndexGetsDeletedAndReCreated() { + final int totalIndices = 3; + final int numShards = 3; + final int numReplicas = 3; + final int pageSize = (numReplicas + 1) * 2; + final int totalPages = (int) Math.ceil((totalIndices * numShards * (numReplicas + 1) * 1.0) / pageSize); + ClusterState clusterState = getRandomClusterState(Collections.emptyList()); + for (int indexNum = 1; indexNum <= totalIndices; indexNum++) { + clusterState = addIndexToClusterState(clusterState, indexNum, numShards, numReplicas); + } + + String requestedToken = null; + ShardPaginationStrategy strategy = null; + for (int page = 1; page < totalPages; page++) { + PageParams pageParams = new PageParams(requestedToken, PARAM_ASC_SORT_VALUE, pageSize); + strategy = new ShardPaginationStrategy(pageParams, clusterState); + requestedToken = strategy.getResponseToken().getNextToken(); + } + // The last page was supposed to fetch the remaining two indices for test-index-3 + assertEquals(8, strategy.getRequestedEntities().size()); + assertEquals(1, strategy.getRequestedIndices().size()); + assertEquals(TEST_INDEX_PREFIX + 3, strategy.getRequestedIndices().get(0)); + + // Delete the index and re-create + clusterState = deleteIndexFromClusterState(clusterState, 3); + clusterState = addIndexToClusterState( + clusterState, + 3, + numShards, + numReplicas, + Instant.now().plus(4, ChronoUnit.SECONDS).toEpochMilli() + ); + + // since test-index-3 should now be considered a new index, all shards for it should appear + PageParams pageParams = new PageParams(requestedToken, PARAM_ASC_SORT_VALUE, pageSize); + strategy = new ShardPaginationStrategy(pageParams, clusterState); + assertEquals(8, strategy.getRequestedEntities().size()); + for (ShardRouting shardRouting : strategy.getRequestedEntities()) { + assertTrue( + (shardRouting.getId() == 0 || shardRouting.getId() == 1) + && Objects.equals(shardRouting.getIndexName(), TEST_INDEX_PREFIX + 3) + ); + } + assertEquals(1, strategy.getRequestedIndices().size()); + assertEquals(TEST_INDEX_PREFIX + 3, strategy.getRequestedIndices().get(0)); + assertNotNull(strategy.getResponseToken().getNextToken()); + + requestedToken = strategy.getResponseToken().getNextToken(); + pageParams = new PageParams(requestedToken, PARAM_ASC_SORT_VALUE, pageSize); + strategy = new ShardPaginationStrategy(pageParams, clusterState); + assertEquals(4, strategy.getRequestedEntities().size()); + for (ShardRouting shardRouting : strategy.getRequestedEntities()) { + assertTrue(shardRouting.getId() == 2 && Objects.equals(shardRouting.getIndexName(), TEST_INDEX_PREFIX + 3)); + } + assertEquals(1, strategy.getRequestedIndices().size()); + assertEquals(TEST_INDEX_PREFIX + 3, strategy.getRequestedIndices().get(0)); + assertNull(strategy.getResponseToken().getNextToken()); + } + + public void testCreatingShardStrategyPageTokenWithRequestedTokenNull() { + try { + new ShardPaginationStrategy.ShardStrategyToken(null); + fail("expected exception"); + } catch (Exception e) { + assert e.getMessage().contains("requestedTokenString can not be null"); + } + } + + public void testIndexStrategyPageTokenWithWronglyEncryptedRequestToken() { + assertThrows(OpenSearchParseException.class, () -> new ShardPaginationStrategy.ShardStrategyToken("3%4%5")); + } + + public void testIndexStrategyPageTokenWithIncorrectNumberOfElementsInRequestedToken() { + assertThrows( + OpenSearchParseException.class, + () -> new ShardPaginationStrategy.ShardStrategyToken(PaginationStrategy.encryptStringToken("1|1725361543")) + ); + assertThrows( + OpenSearchParseException.class, + () -> new ShardPaginationStrategy.ShardStrategyToken(PaginationStrategy.encryptStringToken("1|1725361543|index|12345")) + ); + } + + public void testIndexStrategyPageTokenWithInvalidValuesInRequestedToken() { + assertThrows( + OpenSearchParseException.class, + () -> new ShardPaginationStrategy.ShardStrategyToken(PaginationStrategy.encryptStringToken("-1725361543|1725361543|index")) + ); + } + + public void testCreatingIndexStrategyPageTokenWithNameOfLastRespondedIndexNull() { + try { + new ShardPaginationStrategy.ShardStrategyToken(null, 0, 1234l); + fail("expected exception"); + } catch (Exception e) { + assert e.getMessage().contains("index name should be provided"); + } + } + + public void testCreatingIndexStrategyPageTokenWithNonParseableShardID() { + try { + new ShardPaginationStrategy.ShardStrategyToken(PaginationStrategy.encryptStringToken("shardID|1725361543|index")); + fail("expected exception"); + } catch (Exception e) { + assert e.getMessage().contains(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE); + } + } + + /** + * @param indexNumbers would be used to create indices having names with integer appended after foo, like foo1, foo2. + * @return random clusterState consisting of indices having their creation times set to the integer used to name them. + */ + private ClusterState getRandomClusterState(List indexNumbers) { + ClusterState clusterState = ClusterState.builder(new ClusterName("test")) + .metadata(Metadata.builder().build()) + .routingTable(RoutingTable.builder().build()) + .build(); + for (Integer indexNumber : indexNumbers) { + clusterState = addIndexToClusterState(clusterState, indexNumber, DEFAULT_NUMBER_OF_SHARDS, DEFAULT_NUMBER_OF_REPLICAS); + } + return clusterState; + } + + private ClusterState addIndexToClusterState( + ClusterState clusterState, + final int indexNumber, + final int numShards, + final int numReplicas + ) { + return addIndexToClusterState( + clusterState, + indexNumber, + numShards, + numReplicas, + Instant.now().plus(indexNumber, ChronoUnit.SECONDS).toEpochMilli() + ); + } + + private ClusterState addIndexToClusterState( + ClusterState clusterState, + final int indexNumber, + final int numShards, + final int numReplicas, + final long creationTime + ) { + IndexMetadata indexMetadata = IndexMetadata.builder(TEST_INDEX_PREFIX + indexNumber) + .settings(settings(Version.CURRENT).put(SETTING_CREATION_DATE, creationTime)) + .numberOfShards(numShards) + .numberOfReplicas(numReplicas) + .build(); + IndexRoutingTable.Builder indexRoutingTableBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex()).initializeAsNew( + indexMetadata + ); + return ClusterState.builder(clusterState) + .metadata(Metadata.builder(clusterState.metadata()).put(indexMetadata, true).build()) + .routingTable(RoutingTable.builder(clusterState.routingTable()).add(indexRoutingTableBuilder).build()) + .build(); + } + + private ClusterState deleteIndexFromClusterState(ClusterState clusterState, int indexNumber) { + return ClusterState.builder(clusterState) + .metadata(Metadata.builder(clusterState.metadata()).remove(TEST_INDEX_PREFIX + indexNumber)) + .routingTable(RoutingTable.builder(clusterState.routingTable()).remove(TEST_INDEX_PREFIX + indexNumber).build()) + .build(); + } + + private void verifyOrderForAllShards(ClusterState clusterState, List shardRoutings, String sortOrder) { + ShardRouting prevShard = shardRoutings.get(0); + for (ShardRouting shard : shardRoutings.subList(1, shardRoutings.size())) { + if (Objects.equals(shard.getIndexName(), prevShard.getIndexName())) { + assertTrue(shard.getId() >= prevShard.getId()); + } else { + if (sortOrder.equals(PARAM_ASC_SORT_VALUE)) { + assertTrue( + clusterState.metadata().index(shard.getIndexName()).getCreationDate() > clusterState.metadata() + .index(prevShard.getIndexName()) + .getCreationDate() + ); + } else { + assertTrue( + clusterState.metadata().index(shard.getIndexName()).getCreationDate() < clusterState.metadata() + .index(prevShard.getIndexName()) + .getCreationDate() + ); + } + prevShard = shard; + } + } + } + +} diff --git a/server/src/test/java/org/opensearch/rest/action/cat/RestIndicesActionTests.java b/server/src/test/java/org/opensearch/rest/action/cat/RestIndicesActionTests.java index 2debb00a0d14f..67baedb7df5a9 100644 --- a/server/src/test/java/org/opensearch/rest/action/cat/RestIndicesActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/cat/RestIndicesActionTests.java @@ -35,6 +35,7 @@ import org.opensearch.Version; import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.IndexStats; +import org.opensearch.action.pagination.PageToken; import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.health.ClusterIndexHealth; import org.opensearch.cluster.metadata.IndexMetadata; @@ -50,7 +51,6 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexSettings; import org.opensearch.rest.action.list.RestIndicesListAction; -import org.opensearch.rest.pagination.PageToken; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.rest.FakeRestRequest; import org.junit.Before; diff --git a/server/src/test/java/org/opensearch/rest/action/cat/RestShardsActionTests.java b/server/src/test/java/org/opensearch/rest/action/cat/RestShardsActionTests.java index 883df7da5d717..c412167a10c75 100644 --- a/server/src/test/java/org/opensearch/rest/action/cat/RestShardsActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/cat/RestShardsActionTests.java @@ -38,6 +38,7 @@ import org.opensearch.action.admin.indices.stats.IndexStats; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.action.pagination.PageToken; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; @@ -50,6 +51,7 @@ import org.opensearch.index.shard.ShardPath; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.rest.FakeRestRequest; +import org.junit.Before; import java.nio.file.Path; import java.util.ArrayList; @@ -64,14 +66,18 @@ public class RestShardsActionTests extends OpenSearchTestCase { - public void testBuildTable() { + private final DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); + private List shardRoutings = new ArrayList<>(); + private Map shardStatsMap = new HashMap<>(); + private ClusterStateResponse state; + private IndicesStatsResponse stats; + + @Before + public void setup() { final int numShards = randomIntBetween(1, 5); long numDocs = randomLongBetween(0, 10000); long numDeletedDocs = randomLongBetween(0, 100); - DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); - List shardRoutings = new ArrayList<>(numShards); - Map shardStatsMap = new HashMap<>(); String index = "index"; for (int i = 0; i < numShards; i++) { ShardRoutingState shardRoutingState = ShardRoutingState.fromValue((byte) randomIntBetween(2, 3)); @@ -97,23 +103,49 @@ public void testBuildTable() { when(indexStats.getPrimaries()).thenReturn(new CommonStats()); when(indexStats.getTotal()).thenReturn(new CommonStats()); - IndicesStatsResponse stats = mock(IndicesStatsResponse.class); + stats = mock(IndicesStatsResponse.class); when(stats.asMap()).thenReturn(shardStatsMap); DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class); when(discoveryNodes.get(localNode.getId())).thenReturn(localNode); - ClusterStateResponse state = mock(ClusterStateResponse.class); + state = mock(ClusterStateResponse.class); RoutingTable routingTable = mock(RoutingTable.class); when(routingTable.allShards()).thenReturn(shardRoutings); ClusterState clusterState = mock(ClusterState.class); when(clusterState.routingTable()).thenReturn(routingTable); when(clusterState.nodes()).thenReturn(discoveryNodes); when(state.getState()).thenReturn(clusterState); + } + + public void testBuildTable() { + final RestShardsAction action = new RestShardsAction(); + final Table table = action.buildTable( + new FakeRestRequest(), + state.getState().nodes(), + stats, + state.getState().routingTable().allShards(), + null + ); + assertTable(table); + } + public void testBuildTableWithPageToken() { final RestShardsAction action = new RestShardsAction(); - final Table table = action.buildTable(new FakeRestRequest(), state, stats); + final Table table = action.buildTable( + new FakeRestRequest(), + state.getState().nodes(), + stats, + state.getState().routingTable().allShards(), + new PageToken("foo", "test") + ); + assertTable(table); + assertNotNull(table.getPageToken()); + assertEquals("foo", table.getPageToken().getNextToken()); + assertEquals("test", table.getPageToken().getPaginatedEntity()); + } + private void assertTable(Table table) { // now, verify the table is correct List headers = table.getHeaders(); assertThat(headers.get(0).value, equalTo("index")); @@ -128,7 +160,7 @@ public void testBuildTable() { assertThat(headers.get(79).value, equalTo("docs.deleted")); final List> rows = table.getRows(); - assertThat(rows.size(), equalTo(numShards)); + assertThat(rows.size(), equalTo(shardRoutings.size())); Iterator shardRoutingsIt = shardRoutings.iterator(); for (final List row : rows) { diff --git a/server/src/test/java/org/opensearch/rest/action/cat/RestTableTests.java b/server/src/test/java/org/opensearch/rest/action/cat/RestTableTests.java index a82e563d70273..f475d0ff1efda 100644 --- a/server/src/test/java/org/opensearch/rest/action/cat/RestTableTests.java +++ b/server/src/test/java/org/opensearch/rest/action/cat/RestTableTests.java @@ -32,12 +32,12 @@ package org.opensearch.rest.action.cat; +import org.opensearch.action.pagination.PageToken; import org.opensearch.common.Table; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.rest.AbstractRestChannel; import org.opensearch.rest.RestResponse; -import org.opensearch.rest.pagination.PageToken; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.rest.FakeRestRequest; import org.junit.Before; diff --git a/server/src/test/java/org/opensearch/rest/action/list/RestShardsListActionTests.java b/server/src/test/java/org/opensearch/rest/action/list/RestShardsListActionTests.java new file mode 100644 index 0000000000000..cebcee05ca69b --- /dev/null +++ b/server/src/test/java/org/opensearch/rest/action/list/RestShardsListActionTests.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.list; + +import org.opensearch.OpenSearchParseException; +import org.opensearch.action.pagination.PageParams; +import org.opensearch.action.pagination.PaginationStrategy; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.rest.FakeRestRequest; + +import java.util.HashMap; +import java.util.Map; + +import static org.opensearch.action.pagination.PageParams.PARAM_ASC_SORT_VALUE; +import static org.opensearch.rest.action.list.RestShardsListAction.MAX_SUPPORTED_LIST_SHARDS_PAGE_SIZE; +import static org.opensearch.rest.action.list.RestShardsListAction.MIN_SUPPORTED_LIST_SHARDS_PAGE_SIZE; + +public class RestShardsListActionTests extends OpenSearchTestCase { + + private final RestShardsListAction action = new RestShardsListAction(); + + public void testShardsListActionIsPaginated() { + assertTrue(action.isActionPaginated()); + } + + public void testValidateAndGetPageParamsWithDefaultParams() { + Map params = new HashMap<>(); + RestRequest restRequest = new FakeRestRequest(params); + PageParams pageParams = action.validateAndGetPageParams(restRequest); + assertEquals(MIN_SUPPORTED_LIST_SHARDS_PAGE_SIZE, pageParams.getSize()); + assertEquals(PARAM_ASC_SORT_VALUE, pageParams.getSort()); + assertNull(pageParams.getRequestedToken()); + } + + public void testValidateAndGetPageParamsWithSizeBelowMin() { + Map params = new HashMap<>(); + params.put("size", String.valueOf(MIN_SUPPORTED_LIST_SHARDS_PAGE_SIZE - 1)); + RestRequest restRequest = new FakeRestRequest(params); + assertThrows(IllegalArgumentException.class, () -> action.validateAndGetPageParams(restRequest)); + } + + public void testValidateAndGetPageParamsWithSizeAboveRange() { + Map params = new HashMap<>(); + params.put("size", String.valueOf(MAX_SUPPORTED_LIST_SHARDS_PAGE_SIZE * 10)); + RestRequest restRequest = new FakeRestRequest(params); + assertThrows(IllegalArgumentException.class, () -> action.validateAndGetPageParams(restRequest)); + } + + public void testValidateAndGetPageParamsWithInvalidRequestToken() { + Map params = new HashMap<>(); + params.put("next_token", PaginationStrategy.encryptStringToken("1|-1|test")); + RestRequest restRequest = new FakeRestRequest(params); + assertThrows(OpenSearchParseException.class, () -> action.validateAndGetPageParams(restRequest)); + } + + public void testValidateAndGetPageParamsWithValidPageParams() { + Map params = new HashMap<>(); + params.put("next_token", PaginationStrategy.encryptStringToken("1|1|test")); + params.put("sort", "asc"); + params.put("size", "3000"); + RestRequest restRequest = new FakeRestRequest(params); + PageParams pageParams = action.validateAndGetPageParams(restRequest); + + assertEquals(PaginationStrategy.encryptStringToken("1|1|test"), pageParams.getRequestedToken()); + assertEquals(3000, pageParams.getSize()); + assertEquals("asc", pageParams.getSort()); + } + +} diff --git a/test/framework/src/main/java/org/opensearch/test/rest/FakeRestRequest.java b/test/framework/src/main/java/org/opensearch/test/rest/FakeRestRequest.java index e7810ae4c8f1c..1880977c2d35d 100644 --- a/test/framework/src/main/java/org/opensearch/test/rest/FakeRestRequest.java +++ b/test/framework/src/main/java/org/opensearch/test/rest/FakeRestRequest.java @@ -60,6 +60,15 @@ public FakeRestRequest() { ); } + public FakeRestRequest(Map params) { + this( + NamedXContentRegistry.EMPTY, + new FakeHttpRequest(Method.GET, "", BytesArray.EMPTY, new HashMap<>()), + params, + new FakeHttpChannel(null) + ); + } + private FakeRestRequest( NamedXContentRegistry xContentRegistry, HttpRequest httpRequest,