diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/CursorIT.java b/integ-test/src/test/java/org/opensearch/sql/legacy/CursorIT.java index abd2bbbcc2..d0c2f19f42 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/CursorIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/CursorIT.java @@ -182,7 +182,7 @@ public void validTotalResultWithAndWithoutPaginationOrderBy() throws IOException String selectQuery = StringUtils.format( "SELECT firstname, state FROM %s ORDER BY balance DESC ", TEST_INDEX_ACCOUNT); - verifyWithAndWithoutPaginationResponse(selectQuery + " LIMIT 2000", selectQuery, 26, false); + verifyWithAndWithoutPaginationResponse(selectQuery + " LIMIT 2000", selectQuery, 25, false); } @Test diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java index f81e1b6615..66f85b0754 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java @@ -149,8 +149,11 @@ public void onFailure(Exception e) { private Settings defaultSettings() { return new Settings() { - private final Map defaultSettings = - new ImmutableMap.Builder().put(Key.QUERY_SIZE_LIMIT, 200).build(); + private final Map defaultSettings = + new ImmutableMap.Builder() + .put(Key.QUERY_SIZE_LIMIT, 200) + .put(Key.SQL_PAGINATION_API_SEARCH_AFTER, true) + .build(); @Override public T getSettingValue(Key key) { diff --git a/integ-test/src/test/java/org/opensearch/sql/sql/PaginationFilterIT.java b/integ-test/src/test/java/org/opensearch/sql/sql/PaginationFilterIT.java index 038596cf57..9a945ec86f 100644 --- a/integ-test/src/test/java/org/opensearch/sql/sql/PaginationFilterIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/sql/PaginationFilterIT.java @@ -34,25 +34,30 @@ public class PaginationFilterIT extends SQLIntegTestCase { */ private static final Map STATEMENT_TO_NUM_OF_PAGES = Map.of( - "SELECT * FROM " + TestsConstants.TEST_INDEX_ACCOUNT, 1000, + "SELECT * FROM " + TestsConstants.TEST_INDEX_ACCOUNT, + 1000, "SELECT * FROM " + TestsConstants.TEST_INDEX_ACCOUNT + " WHERE match(address, 'street')", - 385, + 385, "SELECT * FROM " - + TestsConstants.TEST_INDEX_ACCOUNT - + " WHERE match(address, 'street') AND match(city, 'Ola')", - 1, + + TestsConstants.TEST_INDEX_ACCOUNT + + " WHERE match(address, 'street') AND match(city, 'Ola')", + 1, "SELECT firstname, lastname, highlight(address) FROM " - + TestsConstants.TEST_INDEX_ACCOUNT - + " WHERE match(address, 'street') AND match(state, 'OH')", - 5, + + TestsConstants.TEST_INDEX_ACCOUNT + + " WHERE match(address, 'street') AND match(state, 'OH')", + 5, "SELECT firstname, lastname, highlight('*') FROM " - + TestsConstants.TEST_INDEX_ACCOUNT - + " WHERE match(address, 'street') AND match(state, 'OH')", - 5, - "SELECT * FROM " + TestsConstants.TEST_INDEX_BEER + " WHERE true", 60, - "SELECT * FROM " + TestsConstants.TEST_INDEX_BEER + " WHERE Id=10", 1, - "SELECT * FROM " + TestsConstants.TEST_INDEX_BEER + " WHERE Id + 5=15", 1, - "SELECT * FROM " + TestsConstants.TEST_INDEX_BANK, 7); + + TestsConstants.TEST_INDEX_ACCOUNT + + " WHERE match(address, 'street') AND match(state, 'OH')", + 5, + "SELECT * FROM " + TestsConstants.TEST_INDEX_BEER + " WHERE true", + 60, + "SELECT * FROM " + TestsConstants.TEST_INDEX_BEER + " WHERE Id=10", + 1, + "SELECT * FROM " + TestsConstants.TEST_INDEX_BEER + " WHERE Id + 5=15", + 1, + "SELECT * FROM " + TestsConstants.TEST_INDEX_BANK, + 7); private final String sqlStatement; diff --git a/integ-test/src/test/java/org/opensearch/sql/sql/StandalonePaginationIT.java b/integ-test/src/test/java/org/opensearch/sql/sql/StandalonePaginationIT.java index e884734c96..698e185abb 100644 --- a/integ-test/src/test/java/org/opensearch/sql/sql/StandalonePaginationIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/sql/StandalonePaginationIT.java @@ -166,6 +166,7 @@ private Settings defaultSettings() { new ImmutableMap.Builder() .put(Key.QUERY_SIZE_LIMIT, 200) .put(Key.SQL_CURSOR_KEEP_ALIVE, TimeValue.timeValueMinutes(1)) + .put(Key.SQL_PAGINATION_API_SEARCH_AFTER, true) .build(); @Override diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_agg_push.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_agg_push.json index 568b397f07..4654cd085f 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_agg_push.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_agg_push.json @@ -8,7 +8,7 @@ { "name": "OpenSearchIndexScan", "description": { - "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone\u003dfalse)" + "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, client\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)" }, "children": [] } diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_push.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_push.json index 0e7087aa1f..e645e7193b 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_push.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_push.json @@ -8,7 +8,7 @@ { "name": "OpenSearchIndexScan", "description": { - "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"bool\":{\"filter\":[{\"range\":{\"balance\":{\"from\":10000,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":null,\"to\":40,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone\u003dfalse)" + "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"bool\":{\"filter\":[{\"range\":{\"balance\":{\"from\":10000,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":null,\"to\":40,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, client\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)" }, "children": [] } diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_limit_push.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_limit_push.json index 51a627ea4d..cf3ad5e2f4 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_limit_push.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_limit_push.json @@ -16,7 +16,7 @@ { "name": "OpenSearchIndexScan", "description": { - "request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\"}, searchDone=false)" + "request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\"}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, client\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)" }, "children": [] } diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_output.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_output.json index 8d45714283..6503e51996 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_output.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_output.json @@ -31,7 +31,7 @@ { "name": "OpenSearchIndexScan", "description": { - "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone\u003dfalse)" + "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, client\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)" }, "children": [] } diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_sort_push.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_sort_push.json index af2a57e536..3777c6319b 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_sort_push.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_sort_push.json @@ -8,7 +8,7 @@ { "name": "OpenSearchIndexScan", "description": { - "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone\u003dfalse)" + "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, client\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)" }, "children": [] } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/SelectResultSet.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/SelectResultSet.java index 261816cddc..9d1862023c 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/SelectResultSet.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/SelectResultSet.java @@ -567,7 +567,7 @@ private void populateDefaultCursor(DefaultCursor cursor) { Integer limit = cursor.getLimit(); long rowsLeft = rowsLeft(cursor.getFetchSize(), cursor.getLimit()); if (rowsLeft <= 0) { - // close the cursor + // Delete Point In Time ID if (LocalClusterState.state().getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)) { String pitId = cursor.getPitId(); PointInTimeHandler pit = new PointInTimeHandlerImpl(client, pitId); diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/logical/node/TableScan.java b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/logical/node/TableScan.java index 841501bed1..59e6f27216 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/logical/node/TableScan.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/logical/node/TableScan.java @@ -5,8 +5,9 @@ package org.opensearch.sql.legacy.query.planner.logical.node; -import java.util.Map; +import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; +import java.util.Map; import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.query.join.TableInJoinRequestBuilder; import org.opensearch.sql.legacy.query.planner.core.PlanNode; @@ -15,8 +16,6 @@ import org.opensearch.sql.legacy.query.planner.physical.node.pointInTime.PointInTime; import org.opensearch.sql.legacy.query.planner.physical.node.scroll.Scroll; -import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; - /** Table scan */ public class TableScan implements LogicalOperator { diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/SearchHitRow.java b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/SearchHitRow.java similarity index 97% rename from legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/SearchHitRow.java rename to legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/SearchHitRow.java index d03dd5af40..3031429ba8 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/SearchHitRow.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/SearchHitRow.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.sql.legacy.query.planner.physical.node.scroll; +package org.opensearch.sql.legacy.query.planner.physical.node; import com.google.common.base.Strings; import java.util.HashMap; @@ -36,7 +36,7 @@ * ---------------------------------------------------------------------------------------------------------------------- * */ -class SearchHitRow implements Row { +public class SearchHitRow implements Row { /** Native OpenSearch data object for each row */ private final SearchHit hit; @@ -47,7 +47,7 @@ class SearchHitRow implements Row { /** Table alias owned the row. Empty if this row comes from combination of two other rows */ private final String tableAlias; - SearchHitRow(SearchHit hit, String tableAlias) { + public SearchHitRow(SearchHit hit, String tableAlias) { this.hit = hit; this.source = hit.getSourceAsMap(); this.tableAlias = tableAlias; diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/PointInTime.java b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/PointInTime.java index 6d8ba0080f..efc8fd9ddd 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/PointInTime.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/PointInTime.java @@ -24,180 +24,185 @@ import org.opensearch.sql.legacy.query.planner.physical.Row; import org.opensearch.sql.legacy.query.planner.physical.estimation.Cost; import org.opensearch.sql.legacy.query.planner.physical.node.BatchPhysicalOperator; +import org.opensearch.sql.legacy.query.planner.physical.node.SearchHitRow; import org.opensearch.sql.legacy.query.planner.resource.ResourceManager; /** OpenSearch Search API with Point in time as physical implementation of TableScan */ public class PointInTime extends BatchPhysicalOperator { - /** Request to submit to OpenSearch to scroll over */ - private final TableInJoinRequestBuilder request; + /** Request to submit to OpenSearch to scroll over */ + private final TableInJoinRequestBuilder request; - /** Page size to scroll over index */ - private final int pageSize; + /** Page size to scroll over index */ + private final int pageSize; - /** Client connection to ElasticSearch */ - private Client client; + /** Client connection to ElasticSearch */ + private Client client; - /** Currently undergoing search request */ - private SearchResponse searchResponse; + /** Currently undergoing search request */ + private SearchResponse searchResponse; - /** Time out */ - private Integer timeout; + /** Time out */ + private Integer timeout; - private String pitId; + private String pitId; - private PointInTimeHandlerImpl pit; + private PointInTimeHandlerImpl pit; - /** Resource monitor manager */ - private ResourceManager resourceMgr; + /** Resource monitor manager */ + private ResourceManager resourceMgr; - public PointInTime(TableInJoinRequestBuilder request, int pageSize) { - this.request = request; - this.pageSize = pageSize; - } - - @Override - public PlanNode[] children() { - return new PlanNode[0]; - } + public PointInTime(TableInJoinRequestBuilder request, int pageSize) { + this.request = request; + this.pageSize = pageSize; + } - @Override - public Cost estimate() { - return new Cost(); - } + @Override + public PlanNode[] children() { + return new PlanNode[0]; + } - @Override - public void open(ExecuteParams params) throws Exception { - super.open(params); - client = params.get(ExecuteParams.ExecuteParamType.CLIENT); - timeout = params.get(ExecuteParams.ExecuteParamType.TIMEOUT); - resourceMgr = params.get(ExecuteParams.ExecuteParamType.RESOURCE_MANAGER); - - Object filter = params.get(ExecuteParams.ExecuteParamType.EXTRA_QUERY_FILTER); - if (filter instanceof BoolQueryBuilder) { - request - .getRequestBuilder() - .setQuery(generateNewQueryWithExtraFilter((BoolQueryBuilder) filter)); - - if (LOG.isDebugEnabled()) { - LOG.debug( - "Received extra query filter, re-build query: {}", - Strings.toString( - XContentType.JSON, request.getRequestBuilder().request().source(), true, true)); - } - } - } + @Override + public Cost estimate() { + return new Cost(); + } - @Override - public void close() { - if (searchResponse != null) { - LOG.debug("Closing Point In Time (PIT) context"); + @Override + public void open(ExecuteParams params) throws Exception { + super.open(params); + client = params.get(ExecuteParams.ExecuteParamType.CLIENT); + timeout = params.get(ExecuteParams.ExecuteParamType.TIMEOUT); + resourceMgr = params.get(ExecuteParams.ExecuteParamType.RESOURCE_MANAGER); - // Delete the Point In Time context - pit.delete(); + Object filter = params.get(ExecuteParams.ExecuteParamType.EXTRA_QUERY_FILTER); + if (filter instanceof BoolQueryBuilder) { + request + .getRequestBuilder() + .setQuery(generateNewQueryWithExtraFilter((BoolQueryBuilder) filter)); - searchResponse = null; - } else { - LOG.debug("PIT context is already closed or was never opened"); - } + if (LOG.isDebugEnabled()) { + LOG.debug( + "Received extra query filter, re-build query: {}", + Strings.toString( + XContentType.JSON, request.getRequestBuilder().request().source(), true, true)); + } } + } - @Override - protected Collection> prefetch() { - Objects.requireNonNull(client, "Client connection is not ready"); - Objects.requireNonNull(resourceMgr, "ResourceManager is not set"); - Objects.requireNonNull(timeout, "Time out is not set"); - - if (searchResponse == null) { - loadFirstBatch(); - updateMetaResult(); - } else { - loadNextBatchByPitId(); - } - return wrapRowForCurrentBatch(); - } + @Override + public void close() { + if (searchResponse != null) { + LOG.debug("Closing Point In Time (PIT) context"); - /** - * Extra filter pushed down from upstream. Re-parse WHERE clause with extra filter because - * OpenSearch RequestBuilder doesn't allow QueryBuilder inside be changed after added. - */ - private QueryBuilder generateNewQueryWithExtraFilter(BoolQueryBuilder filter) - throws SqlParseException { - Where where = request.getOriginalSelect().getWhere(); - BoolQueryBuilder newQuery; - if (where != null) { - newQuery = QueryMaker.explain(where, false); - newQuery.must(filter); - } else { - newQuery = filter; - } - return newQuery; - } + // Delete the Point In Time context + pit.delete(); - private void loadFirstBatch() { - // Create PIT and set to request object - pit = new PointInTimeHandlerImpl(client, request.getOriginalSelect().getIndexArr()); - pit.create(); - pitId = pit.getPitId(); - searchResponse = - request - .getRequestBuilder() - .addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC) - .setSize(pageSize) - .setTimeout(TimeValue.timeValueSeconds(timeout)) - .setPointInTime(new PointInTimeBuilder(pitId)) - .get(); - LOG.info("Loading first batch of response using Point In Time"); + searchResponse = null; + } else { + LOG.debug("PIT context is already closed or was never opened"); } - - private void updateMetaResult() { - resourceMgr.getMetaResult().addTotalNumOfShards(searchResponse.getTotalShards()); - resourceMgr.getMetaResult().addSuccessfulShards(searchResponse.getSuccessfulShards()); - resourceMgr.getMetaResult().addFailedShards(searchResponse.getFailedShards()); - resourceMgr.getMetaResult().updateTimeOut(searchResponse.isTimedOut()); + } + + @Override + protected Collection> prefetch() { + Objects.requireNonNull(client, "Client connection is not ready"); + Objects.requireNonNull(resourceMgr, "ResourceManager is not set"); + Objects.requireNonNull(timeout, "Time out is not set"); + + if (searchResponse == null) { + loadFirstBatch(); + updateMetaResult(); + } else { + loadNextBatchByPitId(); } - - private void loadNextBatchByPitId() { - // Add PIT with search after to fetch next batch of data - if (searchResponse.getHits().getHits() !=null && searchResponse.getHits().getHits().length > 0) { - Object[] sortValues = searchResponse.getHits().getHits()[searchResponse.getHits().getHits().length - 1].getSortValues(); - LOG.info("Loading next batch of response using Point In Time. - " + pitId); - searchResponse = - request - .getRequestBuilder() - .setSize(pageSize) - .setTimeout(TimeValue.timeValueSeconds(timeout)) - .setPointInTime(new PointInTimeBuilder(pitId)) - .searchAfter(sortValues) - .get(); - } - + return wrapRowForCurrentBatch(); + } + + /** + * Extra filter pushed down from upstream. Re-parse WHERE clause with extra filter because + * OpenSearch RequestBuilder doesn't allow QueryBuilder inside be changed after added. + */ + private QueryBuilder generateNewQueryWithExtraFilter(BoolQueryBuilder filter) + throws SqlParseException { + Where where = request.getOriginalSelect().getWhere(); + BoolQueryBuilder newQuery; + if (where != null) { + newQuery = QueryMaker.explain(where, false); + newQuery.must(filter); + } else { + newQuery = filter; } - - @SuppressWarnings("unchecked") - private Collection> wrapRowForCurrentBatch() { - SearchHit[] hits = searchResponse.getHits().getHits(); - Row[] rows = new Row[hits.length]; - for (int i = 0; i < hits.length; i++) { - rows[i] = new SearchHitRow(hits[i], request.getAlias()); - } - return Arrays.asList(rows); + return newQuery; + } + + private void loadFirstBatch() { + // Create PIT and set to request object + pit = new PointInTimeHandlerImpl(client, request.getOriginalSelect().getIndexArr()); + pit.create(); + pitId = pit.getPitId(); + searchResponse = + request + .getRequestBuilder() + .addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC) + .setSize(pageSize) + .setTimeout(TimeValue.timeValueSeconds(timeout)) + .setPointInTime(new PointInTimeBuilder(pitId)) + .get(); + LOG.info("Loading first batch of response using Point In Time"); + } + + private void updateMetaResult() { + resourceMgr.getMetaResult().addTotalNumOfShards(searchResponse.getTotalShards()); + resourceMgr.getMetaResult().addSuccessfulShards(searchResponse.getSuccessfulShards()); + resourceMgr.getMetaResult().addFailedShards(searchResponse.getFailedShards()); + resourceMgr.getMetaResult().updateTimeOut(searchResponse.isTimedOut()); + } + + private void loadNextBatchByPitId() { + // Add PIT with search after to fetch next batch of data + if (searchResponse.getHits().getHits() != null + && searchResponse.getHits().getHits().length > 0) { + Object[] sortValues = + searchResponse + .getHits() + .getHits()[searchResponse.getHits().getHits().length - 1] + .getSortValues(); + LOG.info("Loading next batch of response using Point In Time. - " + pitId); + searchResponse = + request + .getRequestBuilder() + .setSize(pageSize) + .setTimeout(TimeValue.timeValueSeconds(timeout)) + .setPointInTime(new PointInTimeBuilder(pitId)) + .searchAfter(sortValues) + .get(); } - - @Override - public String toString() { - return "PointInTime [ " + describeTable() + ", pageSize=" + pageSize + " ]"; + } + + @SuppressWarnings("unchecked") + private Collection> wrapRowForCurrentBatch() { + SearchHit[] hits = searchResponse.getHits().getHits(); + Row[] rows = new Row[hits.length]; + for (int i = 0; i < hits.length; i++) { + rows[i] = new SearchHitRow(hits[i], request.getAlias()); } + return Arrays.asList(rows); + } - private String describeTable() { - return request.getOriginalSelect().getFrom().get(0).getIndex() + " as " + request.getAlias(); - } + @Override + public String toString() { + return "PointInTime [ " + describeTable() + ", pageSize=" + pageSize + " ]"; + } - /********************************************* - * Getters for Explain - *********************************************/ + private String describeTable() { + return request.getOriginalSelect().getFrom().get(0).getIndex() + " as " + request.getAlias(); + } - public String getRequest() { - return Strings.toString(XContentType.JSON, request.getRequestBuilder().request().source()); - } + /********************************************* + * Getters for Explain + *********************************************/ + + public String getRequest() { + return Strings.toString(XContentType.JSON, request.getRequestBuilder().request().source()); + } } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/SearchHitRow.java b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/SearchHitRow.java deleted file mode 100644 index ba28b26e69..0000000000 --- a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/SearchHitRow.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.legacy.query.planner.physical.node.pointInTime; - -import com.google.common.base.Strings; -import java.util.HashMap; -import java.util.Map; -import org.opensearch.common.document.DocumentField; -import org.opensearch.index.mapper.MapperService; -import org.opensearch.search.SearchHit; -import org.opensearch.sql.legacy.query.planner.physical.Row; - -/** - * - * - *
- * Search hit row that implements basic accessor for SearchHit.
- * Encapsulate all OpenSearch specific knowledge: how to parse source including nested path.
- * 

- * State transition: - * for example, SELECT e.name.first AS firstName, e.age AS age FROM E e JOIN D d ON ... ORDER BY ... - *

- * Stage | hit.source | tableAlias | Passed in args - * ---------------------------------------------------------------------------------------------------------------------- - * new in Scroll | {"name":{"first": "Allen", "last": "Hank"}, "age": 30} | "e"| new(SearchHit, "e") - * ---------------------------------------------------------------------------------------------------------------------- - * key()/combine() | | | key("name.first", "age") - * in JoinAlgorithm | {"e.name": {...}, "e.age": 30, "d..." } (after combined) | "" | combine(row of D) - * ---------------------------------------------------------------------------------------------------------------------- - * key() in XXSort | same | "" | key("e.name.first", "e.age") - * ---------------------------------------------------------------------------------------------------------------------- - * retain() in Project | {"firstName": "Allen", "age": 30 } | "" | retain("e.name.first", "e.age") - * ---------------------------------------------------------------------------------------------------------------------- - *

- */ -class SearchHitRow implements Row { - - /** Native OpenSearch data object for each row */ - private final SearchHit hit; - - /** Column and value pairs */ - private final Map source; - - /** Table alias owned the row. Empty if this row comes from combination of two other rows */ - private final String tableAlias; - - SearchHitRow(SearchHit hit, String tableAlias) { - this.hit = hit; - this.source = hit.getSourceAsMap(); - this.tableAlias = tableAlias; - } - - @Override - public RowKey key(String[] colNames) { - if (colNames.length == 0) { - return RowKey.NULL; - } - - Object[] keys = new Object[colNames.length]; - for (int i = 0; i < colNames.length; i++) { - keys[i] = getValueOfPath(colNames[i]); - - if (keys[i] == null) { - return RowKey.NULL; - } - } - return new RowKey(keys); - } - - /** - * Replace column name by full name to avoid naming conflicts. For efficiency, this only happens - * here when matched rows found. Create a new one to avoid mutating the original ones in hash - * table which impact subsequent match. - */ - @Override - public Row combine(Row other) { - SearchHit combined = cloneHit(other); - - collectFullName(combined.getSourceAsMap(), this); - if (other != NULL) { - collectFullName(combined.getSourceAsMap(), (SearchHitRow) other); - } - return new SearchHitRow(combined, ""); - } - - @Override - public void retain(Map colNameAlias) { - Map aliasSource = new HashMap<>(); - colNameAlias.forEach( - (colName, alias) -> { - if (colName.endsWith(".*")) { - String tableAlias = colName.substring(0, colName.length() - 2) + "."; - retainAllFieldsFromTable(aliasSource, tableAlias); - } else { - retainOneField(aliasSource, colName, alias); - } - }); - resetSource(aliasSource); - } - - @Override - public SearchHit data() { - return hit; - } - - @Override - public String toString() { - return "SearchHitRow{" + "hit=" + source + '}'; - } - - private Object getValueOfPath(String path) { - /* - * If table alias is missing which means the row was generated by combine(). - * In this case, table alias is present and the first dot should be ignored, ex. "e.name.first" - */ - return getValueOfPath(source, path, Strings.isNullOrEmpty(tableAlias)); - } - - /** Recursively get value for field name path, such as object field a.b.c */ - private Object getValueOfPath(Object source, String path, boolean isIgnoreFirstDot) { - if (!(source instanceof Map) || path.isEmpty()) { - return source; - } - - int dot = path.indexOf('.', (isIgnoreFirstDot ? path.indexOf('.') + 1 : 0)); - if (dot == -1) { - return ((Map) source).get(path); - } - - // Object field name maybe unexpanded without recursive object structure - // ex. {"a.b.c": value} instead of {"a": {"b": {"c": value}}}} - if (((Map) source).containsKey(path)) { - return ((Map) source).get(path); - } - - return getValueOfPath( - ((Map) source).get(path.substring(0, dot)), path.substring(dot + 1), false); - } - - private SearchHit cloneHit(Row other) { - Map documentFields = new HashMap<>(); - Map metaFields = new HashMap<>(); - hit.getFields() - .forEach( - (fieldName, docField) -> - (MapperService.META_FIELDS_BEFORE_7DOT8.contains(fieldName) - ? metaFields - : documentFields) - .put(fieldName, docField)); - SearchHit combined = - new SearchHit( - hit.docId(), - hit.getId() + "|" + (other == NULL ? "0" : ((SearchHitRow) other).hit.getId()), - documentFields, - metaFields); - combined.sourceRef(hit.getSourceRef()); - combined.getSourceAsMap().clear(); - return combined; - } - - private void collectFullName(Map newSource, SearchHitRow row) { - row.source.forEach((colName, value) -> newSource.put(row.tableAlias + "." + colName, value)); - } - - private void retainAllFieldsFromTable(Map aliasSource, String tableAlias) { - source.entrySet().stream() - .filter(e -> e.getKey().startsWith(tableAlias)) - .forEach(e -> aliasSource.put(e.getKey(), e.getValue())); - } - - /** - * Note that column here is already prefixed by table alias after combine(). - * - *

Meanwhile check if column name with table alias prefix, ex. a.name, is property, namely - * a.name.lastname. In this case, split by first second dot and continue searching for the final - * value in nested map by getValueOfPath(source.get("a.name"), "lastname") - */ - private void retainOneField(Map aliasSource, String colName, String alias) { - aliasSource.put(Strings.isNullOrEmpty(alias) ? colName : alias, getValueOfPath(colName)); - } - - private void resetSource(Map newSource) { - source.clear(); - source.putAll(newSource); - } -} diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/Scroll.java b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/Scroll.java index 40e9860886..145fc277a8 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/Scroll.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/Scroll.java @@ -28,6 +28,7 @@ import org.opensearch.sql.legacy.query.planner.physical.Row; import org.opensearch.sql.legacy.query.planner.physical.estimation.Cost; import org.opensearch.sql.legacy.query.planner.physical.node.BatchPhysicalOperator; +import org.opensearch.sql.legacy.query.planner.physical.node.SearchHitRow; import org.opensearch.sql.legacy.query.planner.resource.ResourceManager; /** OpenSearch Scroll API as physical implementation of TableScan */ diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/SearchHitRowTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/SearchHitRowTest.java index dd0fc626c0..f7d2030b0c 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/SearchHitRowTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/SearchHitRowTest.java @@ -12,6 +12,7 @@ import org.junit.Test; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.search.SearchHit; +import org.opensearch.sql.legacy.query.planner.physical.node.SearchHitRow; public class SearchHitRowTest { diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/planner/QueryPlannerTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/planner/QueryPlannerTest.java index 521b225893..6ff907ba30 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/planner/QueryPlannerTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/planner/QueryPlannerTest.java @@ -42,6 +42,7 @@ import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.legacy.domain.JoinSelect; import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.exception.SqlParseException; @@ -104,6 +105,7 @@ public void init() { // to mock. // In this case, default value in Setting will be returned all the time. doReturn(emptyList()).when(settings).getSettings(); + doReturn(false).when(settings).getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER); LocalClusterState.state().setPluginSettings(settings); ActionFuture mockFuture = mock(ActionFuture.class); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java index 0a9cc67993..cdc3d4462f 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java @@ -7,6 +7,8 @@ import java.util.List; import java.util.Map; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.DeletePitRequest; import org.opensearch.client.node.NodeClient; import org.opensearch.sql.opensearch.mapping.IndexMapping; import org.opensearch.sql.opensearch.request.OpenSearchRequest; @@ -89,4 +91,19 @@ public interface OpenSearchClient { void schedule(Runnable task); NodeClient getNodeClient(); + + /** + * Create PIT for given indices + * + * @param createPitRequest Create Point In Time request + * @return PitId + */ + String createPit(CreatePitRequest createPitRequest); + + /** + * Delete PIT + * + * @param deletePitRequest Delete Point In Time request + */ + void deletePit(DeletePitRequest deletePitRequest); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java index 42a0af13c2..c7b5589ced 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java @@ -16,9 +16,6 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; - -import org.opensearch.action.ActionRequest; -import org.opensearch.action.ActionType; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse; @@ -26,12 +23,10 @@ import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; import org.opensearch.action.search.*; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.metadata.AliasMetadata; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.settings.Settings; -import org.opensearch.core.action.ActionResponse; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexSettings; import org.opensearch.sql.opensearch.mapping.IndexMapping; @@ -165,25 +160,29 @@ public List indices() { */ @Override public Map meta() { - return ImmutableMap.of(META_CLUSTER_NAME, client.settings().get("cluster.name", "opensearch")); + return ImmutableMap.of( + META_CLUSTER_NAME, + client.settings().get("cluster.name", "opensearch"), + "plugins.sql.pagination.api", + client.settings().get("plugins.sql.pagination.api", "true")); } @Override public void cleanup(OpenSearchRequest request) { if (request instanceof OpenSearchScrollRequest) { request.clean( - scrollId -> { - try { - client.prepareClearScroll().addScrollId(scrollId).get(); - } catch (Exception e) { - throw new IllegalStateException( - "Failed to clean up resources for search request " + request, e); - } - }); + scrollId -> { + try { + client.prepareClearScroll().addScrollId(scrollId).get(); + } catch (Exception e) { + throw new IllegalStateException( + "Failed to clean up resources for search request " + request, e); + } + }); } else if (request instanceof OpenSearchQueryRequest) { request.clean(pitId -> {}); } - } + } @Override public void schedule(Runnable task) { @@ -195,4 +194,27 @@ public void schedule(Runnable task) { public NodeClient getNodeClient() { return client; } + + @Override + public String createPit(CreatePitRequest createPitRequest) { + ActionFuture execute = + this.client.execute(CreatePitAction.INSTANCE, createPitRequest); + try { + CreatePitResponse pitResponse = execute.get(); + return pitResponse.getId(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Error occurred while creating PIT for new engine SQL query", e); + } + } + + @Override + public void deletePit(DeletePitRequest deletePitRequest) { + ActionFuture execute = + this.client.execute(DeletePitAction.INSTANCE, deletePitRequest); + try { + DeletePitResponse deletePitResponse = execute.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Error occurred while deleting PIT.", e); + } + } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java index dc202edb8d..1e2d8b4107 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java @@ -19,7 +19,7 @@ import org.opensearch.action.admin.cluster.settings.ClusterGetSettingsRequest; import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest; import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; -import org.opensearch.action.search.ClearScrollRequest; +import org.opensearch.action.search.*; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.indices.CreateIndexRequest; @@ -168,6 +168,8 @@ public Map meta() { final Settings defaultSettings = client.cluster().getSettings(request, RequestOptions.DEFAULT).getDefaultSettings(); builder.put(META_CLUSTER_NAME, defaultSettings.get("cluster.name", "opensearch")); + builder.put( + "plugins.sql.pagination.api", defaultSettings.get("plugins.sql.pagination.api", "true")); return builder.build(); } catch (IOException e) { throw new IllegalStateException("Failed to get cluster meta info", e); @@ -178,16 +180,16 @@ public Map meta() { public void cleanup(OpenSearchRequest request) { if (request instanceof OpenSearchScrollRequest) { request.clean( - scrollId -> { - try { - ClearScrollRequest clearRequest = new ClearScrollRequest(); - clearRequest.addScrollId(scrollId); - client.clearScroll(clearRequest, RequestOptions.DEFAULT); - } catch (IOException e) { - throw new IllegalStateException( - "Failed to clean up resources for search request " + request, e); - } - }); + scrollId -> { + try { + ClearScrollRequest clearRequest = new ClearScrollRequest(); + clearRequest.addScrollId(scrollId); + client.clearScroll(clearRequest, RequestOptions.DEFAULT); + } catch (IOException e) { + throw new IllegalStateException( + "Failed to clean up resources for search request " + request, e); + } + }); } else if (request instanceof OpenSearchQueryRequest) { request.clean(pitId -> {}); } @@ -202,4 +204,25 @@ public void schedule(Runnable task) { public NodeClient getNodeClient() { throw new UnsupportedOperationException("Unsupported method."); } + + @Override + public String createPit(CreatePitRequest createPitRequest) { + try { + CreatePitResponse createPitResponse = + client.createPit(createPitRequest, RequestOptions.DEFAULT); + return createPitResponse.getId(); + } catch (IOException e) { + throw new RuntimeException("Error occurred while creating PIT for new engine SQL query", e); + } + } + + @Override + public void deletePit(DeletePitRequest deletePitRequest) { + try { + DeletePitResponse deletePitResponse = + client.deletePit(deletePitRequest, RequestOptions.DEFAULT); + } catch (IOException e) { + throw new RuntimeException("Error occurred while creating PIT for new engine SQL query", e); + } + } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java index 643e02994a..04c5dad3e0 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java @@ -5,17 +5,19 @@ package org.opensearch.sql.opensearch.request; +import static org.opensearch.core.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS; +import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME; +import static org.opensearch.search.sort.SortOrder.ASC; + import java.io.IOException; import java.util.Collections; import java.util.List; -import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import java.util.function.Function; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.ToString; import org.opensearch.action.search.*; -import org.opensearch.common.action.ActionFuture; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentType; @@ -33,10 +35,6 @@ import org.opensearch.sql.opensearch.storage.OpenSearchIndex; import org.opensearch.sql.opensearch.storage.OpenSearchStorageEngine; -import static org.opensearch.core.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS; -import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME; -import static org.opensearch.search.sort.SortOrder.ASC; - /** * OpenSearch search request. This has to be stateful because it needs to: * @@ -108,118 +106,127 @@ public OpenSearchQueryRequest( /** Constructor of OpenSearchQueryRequest with PIT support. */ public OpenSearchQueryRequest( - IndexName indexName, - SearchSourceBuilder sourceBuilder, - OpenSearchExprValueFactory factory, - List includes, - TimeValue cursorKeepAlive, - OpenSearchClient client) { + IndexName indexName, + SearchSourceBuilder sourceBuilder, + OpenSearchExprValueFactory factory, + List includes, + TimeValue cursorKeepAlive, + OpenSearchClient client) { this.indexName = indexName; this.sourceBuilder = sourceBuilder; this.exprValueFactory = factory; this.includes = includes; this.cursorKeepAlive = cursorKeepAlive; this.client = client; + this.pitId = createPIT(); } @Override public OpenSearchResponse search( Function searchAction, Function scrollAction) { + if (this.pitId == null) { + // When SearchRequest doesn't contain PitId, fetch single page request + if (searchDone) { + return new OpenSearchResponse(SearchHits.empty(), exprValueFactory, includes); + } else { + searchDone = true; + return new OpenSearchResponse( + searchAction.apply( + new SearchRequest().indices(indexName.getIndexNames()).source(sourceBuilder)), + exprValueFactory, + includes); + } + } else { + // Search with PIT instead of scroll API + return searchWithPIT(searchAction); + } + } + + public OpenSearchResponse searchWithPIT(Function searchAction) { OpenSearchResponse openSearchResponse; if (searchDone) { openSearchResponse = new OpenSearchResponse(SearchHits.empty(), exprValueFactory, includes); } else { - searchDone = true; - - // Create PIT and Set PIT - if (this.pitId == null) { - this.pitId = createPIT(); - } this.sourceBuilder.pointInTimeBuilder(new PointInTimeBuilder(this.pitId)); this.sourceBuilder.timeout(cursorKeepAlive); - // check for search after if (searchAfter != null) { - this.sourceBuilder.searchAfter(searchAfter); // Set search_after in the query + this.sourceBuilder.searchAfter(searchAfter); } - // Set sort field for search_after if (this.sourceBuilder.sorts() == null) { this.sourceBuilder.sort(DOC_FIELD_NAME, ASC); } - SearchRequest searchRequest = new SearchRequest().indices(indexName.getIndexNames()).source(this.sourceBuilder); + SearchRequest searchRequest = new SearchRequest().source(this.sourceBuilder); this.searchResponse = searchAction.apply(searchRequest); - openSearchResponse = new OpenSearchResponse( - this.searchResponse, - exprValueFactory, - includes); + openSearchResponse = new OpenSearchResponse(this.searchResponse, exprValueFactory, includes); needClean = openSearchResponse.isEmpty(); - if (!needClean && this.searchResponse.getHits().getHits() != null) { - searchAfter = this.searchResponse.getHits().getHits()[this.searchResponse.getHits().getHits().length - 1].getSortValues(); + searchDone = openSearchResponse.isEmpty(); + if (!needClean + && this.searchResponse.getHits().getHits() != null + && this.searchResponse.getHits().getHits().length > 0) { + searchAfter = + this.searchResponse + .getHits() + .getHits()[this.searchResponse.getHits().getHits().length - 1] + .getSortValues(); this.sourceBuilder.searchAfter(searchAfter); - - SearchResponse nextBatchResponse = searchAction.apply(new SearchRequest().indices(indexName.getIndexNames()).source(this.sourceBuilder)); - needClean = nextBatchResponse.getHits().getHits().length == 0; - } } return openSearchResponse; } public String createPIT() { - CreatePitRequest createPitRequest = - new CreatePitRequest(this.cursorKeepAlive, false, indexName.getIndexNames()); - ActionFuture execute = - this.client.getNodeClient().execute(CreatePitAction.INSTANCE, createPitRequest); - try { - CreatePitResponse pitResponse = execute.get(); - return pitResponse.getId(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("Error occurred while creating PIT for new engine SQL query", e); - } + new CreatePitRequest(this.cursorKeepAlive, false, indexName.getIndexNames()); + return this.client.createPit(createPitRequest); } public void deletePit() { DeletePitRequest deletePitRequest = new DeletePitRequest(this.pitId); - ActionFuture execute = - this.client.getNodeClient().execute(DeletePitAction.INSTANCE, deletePitRequest); - try { - DeletePitResponse deletePitResponse = execute.get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("Error occurred while deleting PIT.", e); - } + this.client.deletePit(deletePitRequest); + searchDone = true; + this.pitId = null; } @Override public void clean(Consumer cleanAction) { - if (needClean) { + if (needClean && this.pitId != null) { deletePit(); } } @Override public boolean hasAnotherBatch() { - return !needClean; + if (this.pitId != null) { + return !needClean; + } + return false; } @Override public void writeTo(StreamOutput out) throws IOException { - // Convert SearchSourceBuilder to XContent and write it as a string - out.writeString(sourceBuilder.toString()); - - out.writeTimeValue(sourceBuilder.timeout()); - out.writeString(sourceBuilder.pointInTimeBuilder().getId()); - out.writeStringCollection(includes); - indexName.writeTo(out); - - // Serialize the searchAfter array - out.writeVInt(searchAfter.length); - for (Object obj : searchAfter) { - out.writeGenericValue(obj); + if (this.pitId != null) { + // Convert SearchSourceBuilder to XContent and write it as a string + out.writeString(sourceBuilder.toString()); + + out.writeTimeValue(sourceBuilder.timeout()); + out.writeString(sourceBuilder.pointInTimeBuilder().getId()); + out.writeStringCollection(includes); + indexName.writeTo(out); + + // Serialize the searchAfter array + out.writeVInt(searchAfter.length); + for (Object obj : searchAfter) { + out.writeGenericValue(obj); + } + } else { + // OpenSearch Query request without PIT for single page requests + throw new UnsupportedOperationException( + "OpenSearchQueryRequest serialization is not implemented."); } } @@ -230,18 +237,17 @@ public void writeTo(StreamOutput out) throws IOException { * @param engine OpenSearchSqlEngine to get node-specific context. * @throws IOException thrown if reading from input {@code in} fails. */ - public OpenSearchQueryRequest(StreamInput in, OpenSearchStorageEngine engine) - throws IOException { + public OpenSearchQueryRequest(StreamInput in, OpenSearchStorageEngine engine) throws IOException { // Deserialize the SearchSourceBuilder from the string representation String sourceBuilderString = in.readString(); NamedXContentRegistry xContentRegistry = - new NamedXContentRegistry( - new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedXContents()); + new NamedXContentRegistry( + new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedXContents()); XContentParser parser = - XContentType.JSON - .xContent() - .createParser(xContentRegistry, IGNORE_DEPRECATIONS, sourceBuilderString); + XContentType.JSON + .xContent() + .createParser(xContentRegistry, IGNORE_DEPRECATIONS, sourceBuilderString); this.sourceBuilder = SearchSourceBuilder.fromXContent(parser); cursorKeepAlive = in.readTimeValue(); @@ -252,7 +258,6 @@ public OpenSearchQueryRequest(StreamInput in, OpenSearchStorageEngine engine) this.client = engine.getClient(); int length = in.readVInt(); - // Read each element of the searchAfter array this.searchAfter = new Object[length]; for (int i = 0; i < length; i++) { this.searchAfter[i] = in.readGenericValue(); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java index 037f87c0d2..26a470d235 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java @@ -11,8 +11,6 @@ import static org.opensearch.index.query.QueryBuilders.nestedQuery; import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME; import static org.opensearch.search.sort.SortOrder.ASC; -import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE; -import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; import java.util.ArrayList; import java.util.Arrays; @@ -20,7 +18,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -29,8 +26,6 @@ import lombok.ToString; import org.apache.commons.lang3.tuple.Pair; import org.apache.lucene.search.join.ScoreMode; -import org.opensearch.action.search.*; -import org.opensearch.common.action.ActionFuture; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.InnerHitBuilder; @@ -38,8 +33,6 @@ import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.aggregations.AggregationBuilder; -import org.opensearch.search.builder.PointInTimeBuilder; -import org.opensearch.client.Client; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.fetch.subphase.FetchSourceContext; import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder; @@ -78,8 +71,6 @@ public class OpenSearchRequestBuilder { private final Settings settings; - private String[] indices; - /** Constructor. */ public OpenSearchRequestBuilder( int requestedTotalSize, OpenSearchExprValueFactory exprValueFactory, Settings settings) { @@ -96,52 +87,75 @@ public OpenSearchRequestBuilder( /** * Build DSL request. * - * @return query request or scroll request + * @return query request with PIT or scroll request */ public OpenSearchRequest build( - OpenSearchRequest.IndexName indexName, int maxResultWindow, TimeValue cursorKeepAlive, OpenSearchClient client) { + OpenSearchRequest.IndexName indexName, + int maxResultWindow, + TimeValue cursorKeepAlive, + OpenSearchClient client) { if (this.settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)) { - int size = requestedTotalSize; - FetchSourceContext fetchSource = this.sourceBuilder.fetchSource(); - List includes = fetchSource != null ? Arrays.asList(fetchSource.includes()) : List.of(); - if (pageSize == null) { - if (startFrom + size > maxResultWindow) { - sourceBuilder.size(maxResultWindow - startFrom); - } else { - sourceBuilder.size(requestedTotalSize); - } + return buildRequestWithPit(indexName, maxResultWindow, cursorKeepAlive, client); + } else { + return buildRequestWithScroll(indexName, maxResultWindow, cursorKeepAlive); + } + } + + private OpenSearchRequest buildRequestWithPit( + OpenSearchRequest.IndexName indexName, + int maxResultWindow, + TimeValue cursorKeepAlive, + OpenSearchClient client) { + int size = requestedTotalSize; + FetchSourceContext fetchSource = this.sourceBuilder.fetchSource(); + List includes = fetchSource != null ? Arrays.asList(fetchSource.includes()) : List.of(); + + if (pageSize == null) { + if (startFrom + size > maxResultWindow) { + sourceBuilder.size(maxResultWindow - startFrom); + // Search with PIT request + return new OpenSearchQueryRequest( + indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, client); } else { - if (startFrom != 0) { - throw new UnsupportedOperationException("Non-zero offset is not supported with pagination"); - } - sourceBuilder.size(pageSize); + sourceBuilder.from(startFrom); + sourceBuilder.size(requestedTotalSize); + // Search with non-Pit request + return new OpenSearchQueryRequest(indexName, sourceBuilder, exprValueFactory, includes); } - // Set sort field for search_after - sourceBuilder.sort(DOC_FIELD_NAME, ASC); - return new OpenSearchQueryRequest(indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, client); - } else { - int size = requestedTotalSize; - FetchSourceContext fetchSource = this.sourceBuilder.fetchSource(); - List includes = fetchSource != null ? Arrays.asList(fetchSource.includes()) : List.of(); - if (pageSize == null) { - if (startFrom + size > maxResultWindow) { - sourceBuilder.size(maxResultWindow - startFrom); - return new OpenSearchScrollRequest( - indexName, cursorKeepAlive, sourceBuilder, exprValueFactory, includes); - } else { - sourceBuilder.from(startFrom); - sourceBuilder.size(requestedTotalSize); - return new OpenSearchQueryRequest(indexName, sourceBuilder, exprValueFactory, includes); - } - } else { - if (startFrom != 0) { - throw new UnsupportedOperationException("Non-zero offset is not supported with pagination"); - } - sourceBuilder.size(pageSize); + if (startFrom != 0) { + throw new UnsupportedOperationException("Non-zero offset is not supported with pagination"); + } + sourceBuilder.size(pageSize); + // Search with PIT request + return new OpenSearchQueryRequest( + indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, client); + } + } + + private OpenSearchRequest buildRequestWithScroll( + OpenSearchRequest.IndexName indexName, int maxResultWindow, TimeValue cursorKeepAlive) { + int size = requestedTotalSize; + FetchSourceContext fetchSource = this.sourceBuilder.fetchSource(); + List includes = fetchSource != null ? Arrays.asList(fetchSource.includes()) : List.of(); + + if (pageSize == null) { + if (startFrom + size > maxResultWindow) { + sourceBuilder.size(maxResultWindow - startFrom); return new OpenSearchScrollRequest( - indexName, cursorKeepAlive, sourceBuilder, exprValueFactory, includes); + indexName, cursorKeepAlive, sourceBuilder, exprValueFactory, includes); + } else { + sourceBuilder.from(startFrom); + sourceBuilder.size(requestedTotalSize); + return new OpenSearchQueryRequest(indexName, sourceBuilder, exprValueFactory, includes); + } + } else { + if (startFrom != 0) { + throw new UnsupportedOperationException("Non-zero offset is not supported with pagination"); } + sourceBuilder.size(pageSize); + return new OpenSearchScrollRequest( + indexName, cursorKeepAlive, sourceBuilder, exprValueFactory, includes); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java index 73b6a5d08d..b17773cb03 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java @@ -51,7 +51,7 @@ public class OpenSearchIndexScan extends TableScanOperator implements Serializab /** Creates index scan based on a provided OpenSearchRequestBuilder. */ public OpenSearchIndexScan( - OpenSearchClient client, int maxResponseSize, OpenSearchRequest request) { + OpenSearchClient client, int maxResponseSize, OpenSearchRequest request) { this.client = client; this.maxResponseSize = maxResponseSize; this.request = request; @@ -126,11 +126,11 @@ public void readExternal(ObjectInput in) throws IOException { ((PlanSerializer.CursorDeserializationStream) in).resolveObject("engine"); client = engine.getClient(); - - boolean paginationAPISetting = Boolean.parseBoolean(client.getNodeClient().settings().get(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER.getKeyValue(), String.valueOf(true))); - System.out.println("**********OPENSEARCHINDEXSCAN flag: " + paginationAPISetting); + boolean pointInTimeEnabled = + Boolean.parseBoolean( + client.meta().get(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER.getKeyValue())); try (BytesStreamInput bsi = new BytesStreamInput(requestStream)) { - if (paginationAPISetting) { + if (pointInTimeEnabled) { request = new OpenSearchQueryRequest(bsi, engine); } else { request = new OpenSearchScrollRequest(bsi, engine); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java index 739b70b1b8..e5cf94eb86 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java @@ -174,18 +174,20 @@ void explain_successfully() { new OpenSearchExecutionEngine(client, protector, new PlanSerializer(null)); Settings settings = mock(Settings.class); when(settings.getSettingValue(SQL_CURSOR_KEEP_ALIVE)).thenReturn(TimeValue.timeValueMinutes(1)); + when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)).thenReturn(true); OpenSearchExprValueFactory exprValueFactory = mock(OpenSearchExprValueFactory.class); final var name = new OpenSearchRequest.IndexName("test"); final int defaultQuerySize = 100; final int maxResultWindow = 10000; - final var requestBuilder = new OpenSearchRequestBuilder(defaultQuerySize, exprValueFactory); + final var requestBuilder = + new OpenSearchRequestBuilder(defaultQuerySize, exprValueFactory, settings); PhysicalPlan plan = new OpenSearchIndexScan( mock(OpenSearchClient.class), maxResultWindow, requestBuilder.build( - name, maxResultWindow, settings.getSettingValue(SQL_CURSOR_KEEP_ALIVE))); + name, maxResultWindow, settings.getSettingValue(SQL_CURSOR_KEEP_ALIVE), client)); AtomicReference result = new AtomicReference<>(); executor.explain( diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java index 5cd11c6cd4..da06c1eb66 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java @@ -8,9 +8,7 @@ import static java.util.Collections.emptyList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; import static org.opensearch.sql.ast.tree.Sort.SortOption.DEFAULT_ASC; import static org.opensearch.sql.data.type.ExprCoreType.DOUBLE; import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; @@ -91,6 +89,8 @@ public void setup() { @Test void test_protect_indexScan() { + when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)).thenReturn(true); + String indexName = "test"; final int maxResultWindow = 10000; final int querySizeLimit = 200; @@ -114,11 +114,12 @@ void test_protect_indexScan() { final var name = new OpenSearchRequest.IndexName(indexName); final var request = - new OpenSearchRequestBuilder(querySizeLimit, exprValueFactory) + new OpenSearchRequestBuilder(querySizeLimit, exprValueFactory, settings) .build( name, maxResultWindow, - settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE)); + settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE), + client); assertEquals( PhysicalPlanDSL.project( PhysicalPlanDSL.limit( diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java index 742e76cbd0..fa0248782d 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java @@ -7,9 +7,7 @@ import static org.junit.Assert.assertThrows; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.opensearch.index.query.QueryBuilders.matchAllQuery; import static org.opensearch.index.query.QueryBuilders.nestedQuery; import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME; @@ -50,10 +48,12 @@ import org.opensearch.search.sort.FieldSortBuilder; import org.opensearch.search.sort.ScoreSortBuilder; import org.opensearch.search.sort.SortBuilders; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.exception.SemanticCheckException; import org.opensearch.sql.expression.DSL; import org.opensearch.sql.expression.NamedExpression; import org.opensearch.sql.expression.ReferenceExpression; +import org.opensearch.sql.opensearch.client.OpenSearchClient; import org.opensearch.sql.opensearch.data.type.OpenSearchDataType; import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; import org.opensearch.sql.opensearch.response.agg.CompositeAggregationParser; @@ -76,11 +76,18 @@ class OpenSearchRequestBuilderTest { @Mock private OpenSearchExprValueFactory exprValueFactory; + @Mock private OpenSearchClient client; + + @Mock private Settings settings; + private OpenSearchRequestBuilder requestBuilder; @BeforeEach void setup() { - requestBuilder = new OpenSearchRequestBuilder(DEFAULT_LIMIT, exprValueFactory); + requestBuilder = new OpenSearchRequestBuilder(DEFAULT_LIMIT, exprValueFactory, settings); + lenient() + .when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)) + .thenReturn(true); } @Test @@ -100,11 +107,12 @@ void build_query_request() { .trackScores(true), exprValueFactory, List.of()), - requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT)); + requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT, client)); } @Test void build_scroll_request_with_correct_size() { + when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)).thenReturn(false); Integer limit = 800; Integer offset = 10; requestBuilder.pushDownLimit(limit, offset); @@ -119,7 +127,7 @@ void build_scroll_request_with_correct_size() { .timeout(DEFAULT_QUERY_TIMEOUT), exprValueFactory, List.of()), - requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT)); + requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT, client)); } @Test @@ -127,7 +135,7 @@ void test_push_down_query() { QueryBuilder query = QueryBuilders.termQuery("intA", 1); requestBuilder.pushDownFilter(query); - var r = requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT); + var r = requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT, client); Function querySearch = searchRequest -> { assertEquals( @@ -220,7 +228,7 @@ void assertSearchSourceBuilder( throw new UnsupportedOperationException(); }; requestBuilder - .build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT) + .build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT, client) .search(querySearch, scrollSearch); } @@ -290,7 +298,7 @@ void test_push_down_project() { .fetchSource("intA", null), exprValueFactory, List.of("intA")), - requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT)); + requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT, client)); } @Test @@ -320,7 +328,7 @@ void test_push_down_project_limit() { .fetchSource("intA", null), exprValueFactory, List.of("intA")), - requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT)); + requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT, client)); } @Test @@ -350,7 +358,7 @@ void test_push_down_project_limit_and_offset() { .fetchSource("intA", null), exprValueFactory, List.of("intA")), - requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT)); + requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT, client)); } @Test @@ -521,7 +529,7 @@ void exception_when_non_zero_offset_and_page_size() { requestBuilder.pushDownLimit(300, 2); assertThrows( UnsupportedOperationException.class, - () -> requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT)); + () -> requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT, client)); } @Test diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java index 3ca566fac6..ef6b86c42a 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java @@ -79,6 +79,9 @@ class OpenSearchIndexTest { @BeforeEach void setUp() { this.index = new OpenSearchIndex(client, settings, "test"); + lenient() + .when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)) + .thenReturn(true); } @Test @@ -198,10 +201,11 @@ void implementRelationOperatorOnly() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); LogicalPlan plan = index.createScanBuilder(); Integer maxResultWindow = index.getMaxResultWindow(); - final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE_LIMIT, exprValueFactory); + final var requestBuilder = + new OpenSearchRequestBuilder(QUERY_SIZE_LIMIT, exprValueFactory, settings); assertEquals( new OpenSearchIndexScan( - client, 200, requestBuilder.build(INDEX_NAME, maxResultWindow, SCROLL_TIMEOUT)), + client, 200, requestBuilder.build(INDEX_NAME, maxResultWindow, SCROLL_TIMEOUT, client)), index.implement(index.optimize(plan))); } @@ -211,10 +215,11 @@ void implementRelationOperatorWithOptimization() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); LogicalPlan plan = index.createScanBuilder(); Integer maxResultWindow = index.getMaxResultWindow(); - final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE_LIMIT, exprValueFactory); + final var requestBuilder = + new OpenSearchRequestBuilder(QUERY_SIZE_LIMIT, exprValueFactory, settings); assertEquals( new OpenSearchIndexScan( - client, 200, requestBuilder.build(INDEX_NAME, maxResultWindow, SCROLL_TIMEOUT)), + client, 200, requestBuilder.build(INDEX_NAME, maxResultWindow, SCROLL_TIMEOUT, client)), index.implement(plan)); } @@ -243,7 +248,8 @@ void implementOtherLogicalOperators() { include); Integer maxResultWindow = index.getMaxResultWindow(); - final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE_LIMIT, exprValueFactory); + final var requestBuilder = + new OpenSearchRequestBuilder(QUERY_SIZE_LIMIT, exprValueFactory, settings); assertEquals( PhysicalPlanDSL.project( PhysicalPlanDSL.dedupe( @@ -255,7 +261,7 @@ void implementOtherLogicalOperators() { client, QUERY_SIZE_LIMIT, requestBuilder.build( - INDEX_NAME, maxResultWindow, SCROLL_TIMEOUT)), + INDEX_NAME, maxResultWindow, SCROLL_TIMEOUT, client)), mappings), exclude), newEvalField), diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java index 2085519b12..e6a17aceaf 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java @@ -56,6 +56,9 @@ void setup() { lenient() .when(settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE)) .thenReturn(TimeValue.timeValueMinutes(1)); + lenient() + .when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)) + .thenReturn(true); } @Mock private OpenSearchClient client; @@ -69,12 +72,12 @@ void setup() { @Test void query_empty_result() { mockResponse(client); - var builder = new OpenSearchRequestBuilder(QUERY_SIZE, exprValueFactory); + var builder = new OpenSearchRequestBuilder(QUERY_SIZE, exprValueFactory, settings); try (var indexScan = new OpenSearchIndexScan( client, MAX_RESULT_WINDOW, - builder.build(INDEX_NAME, MAX_RESULT_WINDOW, SCROLL_TIMEOUT))) { + builder.build(INDEX_NAME, MAX_RESULT_WINDOW, SCROLL_TIMEOUT, client))) { indexScan.open(); assertFalse(indexScan.hasNext()); } @@ -96,13 +99,13 @@ void dont_serialize_if_no_cursor() { OpenSearchRequestBuilder builder = mock(); OpenSearchRequest request = mock(); OpenSearchResponse response = mock(); - when(builder.build(any(), anyInt(), any())).thenReturn(request); + when(builder.build(any(), anyInt(), any(), any())).thenReturn(request); when(client.search(any())).thenReturn(response); try (var indexScan = new OpenSearchIndexScan( client, MAX_RESULT_WINDOW, - builder.build(INDEX_NAME, MAX_RESULT_WINDOW, SCROLL_TIMEOUT))) { + builder.build(INDEX_NAME, MAX_RESULT_WINDOW, SCROLL_TIMEOUT, client))) { indexScan.open(); when(request.hasAnotherBatch()).thenReturn(false); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java index f813d8f551..bd22e083ad 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java @@ -11,9 +11,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME; import static org.opensearch.search.sort.SortOrder.ASC; import static org.opensearch.sql.data.type.ExprCoreType.STRING; @@ -52,6 +50,7 @@ import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder; import org.opensearch.sql.ast.expression.DataType; import org.opensearch.sql.ast.expression.Literal; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.model.ExprValueUtils; import org.opensearch.sql.exception.NoCursorException; @@ -77,6 +76,7 @@ class OpenSearchIndexScanTest { public static final int MAX_RESULT_WINDOW = 10000; public static final TimeValue CURSOR_KEEP_ALIVE = TimeValue.timeValueMinutes(1); @Mock private OpenSearchClient client; + @Mock private Settings settings; private final OpenSearchExprValueFactory exprValueFactory = new OpenSearchExprValueFactory( @@ -84,7 +84,11 @@ class OpenSearchIndexScanTest { "name", OpenSearchDataType.of(STRING), "department", OpenSearchDataType.of(STRING))); @BeforeEach - void setup() {} + void setup() { + lenient() + .when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)) + .thenReturn(true); + } @Test void explain() { @@ -172,10 +176,12 @@ void plan_for_serialization() { void query_empty_result() { mockResponse(client); final var name = new OpenSearchRequest.IndexName("test"); - final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE, exprValueFactory); + final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE, exprValueFactory, settings); try (OpenSearchIndexScan indexScan = new OpenSearchIndexScan( - client, QUERY_SIZE, requestBuilder.build(name, MAX_RESULT_WINDOW, CURSOR_KEEP_ALIVE))) { + client, + QUERY_SIZE, + requestBuilder.build(name, MAX_RESULT_WINDOW, CURSOR_KEEP_ALIVE, client))) { indexScan.open(); assertFalse(indexScan.hasNext()); } @@ -190,10 +196,10 @@ void query_all_results_with_query() { employee(1, "John", "IT"), employee(2, "Smith", "HR"), employee(3, "Allen", "IT") }); - final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE, exprValueFactory); + final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE, exprValueFactory, settings); try (OpenSearchIndexScan indexScan = new OpenSearchIndexScan( - client, 10, requestBuilder.build(INDEX_NAME, 10000, CURSOR_KEEP_ALIVE))) { + client, 10, requestBuilder.build(INDEX_NAME, 10000, CURSOR_KEEP_ALIVE, client))) { indexScan.open(); assertAll( @@ -218,10 +224,10 @@ void query_all_results_with_scroll() { new ExprValue[] {employee(1, "John", "IT"), employee(2, "Smith", "HR")}, new ExprValue[] {employee(3, "Allen", "IT")}); - final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE, exprValueFactory); + final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE, exprValueFactory, settings); try (OpenSearchIndexScan indexScan = new OpenSearchIndexScan( - client, 10, requestBuilder.build(INDEX_NAME, 10000, CURSOR_KEEP_ALIVE))) { + client, 10, requestBuilder.build(INDEX_NAME, 10000, CURSOR_KEEP_ALIVE, client))) { indexScan.open(); assertAll( @@ -248,10 +254,12 @@ void query_some_results_with_query() { }); final int limit = 3; - OpenSearchRequestBuilder builder = new OpenSearchRequestBuilder(0, exprValueFactory); + OpenSearchRequestBuilder builder = new OpenSearchRequestBuilder(0, exprValueFactory, settings); try (OpenSearchIndexScan indexScan = new OpenSearchIndexScan( - client, limit, builder.build(INDEX_NAME, MAX_RESULT_WINDOW, CURSOR_KEEP_ALIVE))) { + client, + limit, + builder.build(INDEX_NAME, MAX_RESULT_WINDOW, CURSOR_KEEP_ALIVE, client))) { indexScan.open(); assertAll( @@ -269,10 +277,12 @@ void query_some_results_with_query() { @Test void query_some_results_with_scroll() { mockTwoPageResponse(client); - final var requestuilder = new OpenSearchRequestBuilder(10, exprValueFactory); + final var requestuilder = new OpenSearchRequestBuilder(10, exprValueFactory, settings); try (OpenSearchIndexScan indexScan = new OpenSearchIndexScan( - client, 3, requestuilder.build(INDEX_NAME, MAX_RESULT_WINDOW, CURSOR_KEEP_ALIVE))) { + client, + 3, + requestuilder.build(INDEX_NAME, MAX_RESULT_WINDOW, CURSOR_KEEP_ALIVE, client))) { indexScan.open(); assertAll( @@ -306,12 +316,13 @@ void query_results_limited_by_query_size() { }); final int defaultQuerySize = 2; - final var requestBuilder = new OpenSearchRequestBuilder(defaultQuerySize, exprValueFactory); + final var requestBuilder = + new OpenSearchRequestBuilder(defaultQuerySize, exprValueFactory, settings); try (OpenSearchIndexScan indexScan = new OpenSearchIndexScan( client, defaultQuerySize, - requestBuilder.build(INDEX_NAME, QUERY_SIZE, CURSOR_KEEP_ALIVE))) { + requestBuilder.build(INDEX_NAME, QUERY_SIZE, CURSOR_KEEP_ALIVE, client))) { indexScan.open(); assertAll( @@ -368,7 +379,7 @@ void push_down_highlight_with_arguments() { } private PushDownAssertion assertThat() { - return new PushDownAssertion(client, exprValueFactory); + return new PushDownAssertion(client, exprValueFactory, settings); } private static class PushDownAssertion { @@ -377,9 +388,10 @@ private static class PushDownAssertion { private final OpenSearchResponse response; private final OpenSearchExprValueFactory factory; - public PushDownAssertion(OpenSearchClient client, OpenSearchExprValueFactory valueFactory) { + public PushDownAssertion( + OpenSearchClient client, OpenSearchExprValueFactory valueFactory, Settings settings) { this.client = client; - this.requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE, valueFactory); + this.requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE, valueFactory, settings); this.response = mock(OpenSearchResponse.class); this.factory = valueFactory; @@ -411,7 +423,9 @@ PushDownAssertion shouldQueryHighlight(QueryBuilder query, HighlightBuilder high when(client.search(request)).thenReturn(response); var indexScan = new OpenSearchIndexScan( - client, QUERY_SIZE, requestBuilder.build(EMPLOYEES_INDEX, 10000, CURSOR_KEEP_ALIVE)); + client, + QUERY_SIZE, + requestBuilder.build(EMPLOYEES_INDEX, 10000, CURSOR_KEEP_ALIVE, client)); indexScan.open(); return this; } @@ -429,7 +443,9 @@ PushDownAssertion shouldQuery(QueryBuilder expected) { when(client.search(request)).thenReturn(response); var indexScan = new OpenSearchIndexScan( - client, 10000, requestBuilder.build(EMPLOYEES_INDEX, 10000, CURSOR_KEEP_ALIVE)); + client, + 10000, + requestBuilder.build(EMPLOYEES_INDEX, 10000, CURSOR_KEEP_ALIVE, client)); indexScan.open(); return this; } diff --git a/plugin/build.gradle b/plugin/build.gradle index f70a429343..7ebd0ad2d9 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -1,5 +1,3 @@ -import java.util.concurrent.Callable - /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 @@ -116,11 +114,6 @@ configurations.all { resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.9.10" resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-common:1.9.10" } - -configurations { - zipArchive -} - compileJava { options.compilerArgs.addAll(["-processor", 'lombok.launch.AnnotationProcessorHider$AnnotationProcessor']) } @@ -169,8 +162,6 @@ dependencies { testImplementation group: 'org.mockito', name: 'mockito-core', version: "${versions.mockito}" testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: "${versions.mockito}" testImplementation 'org.junit.jupiter:junit-jupiter:5.9.3' - - zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${opensearch_build}" } test { @@ -285,24 +276,7 @@ afterEvaluate { } } -def getJobSchedulerPlugin() { - provider(new Callable() { - @Override - RegularFile call() throws Exception { - return new RegularFile() { - @Override - File getAsFile() { - return configurations.zipArchive.asFileTree.matching { - include '**/opensearch-job-scheduler*' - }.singleFile - } - } - } - }) -} - testClusters.integTest { - plugin(getJobSchedulerPlugin()) plugin(project.tasks.bundlePlugin.archiveFile) testDistribution = "ARCHIVE" @@ -317,4 +291,5 @@ testClusters.integTest { run { useCluster testClusters.integTest -} \ No newline at end of file +} +