diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/CursorIT.java b/integ-test/src/test/java/org/opensearch/sql/legacy/CursorIT.java index abd2bbbcc2..d0c2f19f42 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/CursorIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/CursorIT.java @@ -182,7 +182,7 @@ public void validTotalResultWithAndWithoutPaginationOrderBy() throws IOException String selectQuery = StringUtils.format( "SELECT firstname, state FROM %s ORDER BY balance DESC ", TEST_INDEX_ACCOUNT); - verifyWithAndWithoutPaginationResponse(selectQuery + " LIMIT 2000", selectQuery, 26, false); + verifyWithAndWithoutPaginationResponse(selectQuery + " LIMIT 2000", selectQuery, 25, false); } @Test diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java index f81e1b6615..714bb937da 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java @@ -149,8 +149,11 @@ public void onFailure(Exception e) { private Settings defaultSettings() { return new Settings() { - private final Map defaultSettings = - new ImmutableMap.Builder().put(Key.QUERY_SIZE_LIMIT, 200).build(); + private final Map defaultSettings = + new ImmutableMap.Builder() + .put(Key.QUERY_SIZE_LIMIT, 200) + .put(Key.SQL_PAGINATION_API_SEARCH_AFTER, true) + .build(); @Override public T getSettingValue(Key key) { diff --git a/integ-test/src/test/java/org/opensearch/sql/sql/PaginationFilterIT.java b/integ-test/src/test/java/org/opensearch/sql/sql/PaginationFilterIT.java index 038596cf57..768d73488f 100644 --- a/integ-test/src/test/java/org/opensearch/sql/sql/PaginationFilterIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/sql/PaginationFilterIT.java @@ -36,8 +36,8 @@ public class PaginationFilterIT extends SQLIntegTestCase { Map.of( "SELECT * FROM " + TestsConstants.TEST_INDEX_ACCOUNT, 1000, "SELECT * FROM " + TestsConstants.TEST_INDEX_ACCOUNT + " WHERE match(address, 'street')", - 385, - "SELECT * FROM " + 385, + "SELECT * FROM " + TestsConstants.TEST_INDEX_ACCOUNT + " WHERE match(address, 'street') AND match(city, 'Ola')", 1, @@ -134,6 +134,9 @@ public void test_pagination_with_where() { paginatedResponse = executeCursorQuery(cursor); + System.out.println("**** MANA HERE: paginatedResponse - " + paginatedResponse); + System.out.println("**** MANA HERE: paginatedResponse -size - " + paginatedResponse.getInt("size")); + this.logger.info( testReportPrefix + String.format( @@ -142,6 +145,8 @@ public void test_pagination_with_where() { break; } } while (true); + System.out.println("**** MANA HERE: last paginatedResponse - " + paginatedResponse); + // last page expected results: assertEquals( testReportPrefix + "Last page", totalHits % pageSize, paginatedResponse.getInt("size")); diff --git a/integ-test/src/test/java/org/opensearch/sql/sql/StandalonePaginationIT.java b/integ-test/src/test/java/org/opensearch/sql/sql/StandalonePaginationIT.java index e884734c96..34735a2ec2 100644 --- a/integ-test/src/test/java/org/opensearch/sql/sql/StandalonePaginationIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/sql/StandalonePaginationIT.java @@ -23,6 +23,7 @@ import org.opensearch.client.Request; import org.opensearch.client.ResponseException; import org.opensearch.client.RestHighLevelClient; +import org.opensearch.client.node.NodeClient; import org.opensearch.common.inject.Injector; import org.opensearch.common.inject.ModulesBuilder; import org.opensearch.common.unit.TimeValue; @@ -166,6 +167,7 @@ private Settings defaultSettings() { new ImmutableMap.Builder() .put(Key.QUERY_SIZE_LIMIT, 200) .put(Key.SQL_CURSOR_KEEP_ALIVE, TimeValue.timeValueMinutes(1)) + .put(Key.SQL_PAGINATION_API_SEARCH_AFTER, true) .build(); @Override diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_agg_push.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_agg_push.json index 568b397f07..4654cd085f 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_agg_push.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_agg_push.json @@ -8,7 +8,7 @@ { "name": "OpenSearchIndexScan", "description": { - "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone\u003dfalse)" + "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, client\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)" }, "children": [] } diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_push.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_push.json index 0e7087aa1f..e645e7193b 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_push.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_push.json @@ -8,7 +8,7 @@ { "name": "OpenSearchIndexScan", "description": { - "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"bool\":{\"filter\":[{\"range\":{\"balance\":{\"from\":10000,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":null,\"to\":40,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone\u003dfalse)" + "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"bool\":{\"filter\":[{\"range\":{\"balance\":{\"from\":10000,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":null,\"to\":40,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, client\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)" }, "children": [] } diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_limit_push.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_limit_push.json index 51a627ea4d..cf3ad5e2f4 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_limit_push.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_limit_push.json @@ -16,7 +16,7 @@ { "name": "OpenSearchIndexScan", "description": { - "request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\"}, searchDone=false)" + "request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\"}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, client\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)" }, "children": [] } diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_output.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_output.json index 8d45714283..6503e51996 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_output.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_output.json @@ -31,7 +31,7 @@ { "name": "OpenSearchIndexScan", "description": { - "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone\u003dfalse)" + "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, client\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)" }, "children": [] } diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_sort_push.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_sort_push.json index af2a57e536..3777c6319b 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_sort_push.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_sort_push.json @@ -8,7 +8,7 @@ { "name": "OpenSearchIndexScan", "description": { - "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone\u003dfalse)" + "request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, client\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)" }, "children": [] } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/SelectResultSet.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/SelectResultSet.java index 261816cddc..9d1862023c 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/SelectResultSet.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/SelectResultSet.java @@ -567,7 +567,7 @@ private void populateDefaultCursor(DefaultCursor cursor) { Integer limit = cursor.getLimit(); long rowsLeft = rowsLeft(cursor.getFetchSize(), cursor.getLimit()); if (rowsLeft <= 0) { - // close the cursor + // Delete Point In Time ID if (LocalClusterState.state().getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)) { String pitId = cursor.getPitId(); PointInTimeHandler pit = new PointInTimeHandlerImpl(client, pitId); diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/SearchHitRow.java b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/SearchHitRow.java similarity index 97% rename from legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/SearchHitRow.java rename to legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/SearchHitRow.java index d03dd5af40..fa60ce52e4 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/SearchHitRow.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/SearchHitRow.java @@ -3,16 +3,17 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.sql.legacy.query.planner.physical.node.scroll; +package org.opensearch.sql.legacy.query.planner.physical.node; import com.google.common.base.Strings; -import java.util.HashMap; -import java.util.Map; import org.opensearch.common.document.DocumentField; import org.opensearch.index.mapper.MapperService; import org.opensearch.search.SearchHit; import org.opensearch.sql.legacy.query.planner.physical.Row; +import java.util.HashMap; +import java.util.Map; + /** * * @@ -36,7 +37,7 @@ * ---------------------------------------------------------------------------------------------------------------------- * */ -class SearchHitRow implements Row { +public class SearchHitRow implements Row { /** Native OpenSearch data object for each row */ private final SearchHit hit; @@ -47,7 +48,7 @@ class SearchHitRow implements Row { /** Table alias owned the row. Empty if this row comes from combination of two other rows */ private final String tableAlias; - SearchHitRow(SearchHit hit, String tableAlias) { + public SearchHitRow(SearchHit hit, String tableAlias) { this.hit = hit; this.source = hit.getSourceAsMap(); this.tableAlias = tableAlias; diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/PointInTime.java b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/PointInTime.java index 6d8ba0080f..da9d5adfee 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/PointInTime.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/PointInTime.java @@ -25,6 +25,7 @@ import org.opensearch.sql.legacy.query.planner.physical.estimation.Cost; import org.opensearch.sql.legacy.query.planner.physical.node.BatchPhysicalOperator; import org.opensearch.sql.legacy.query.planner.resource.ResourceManager; +import org.opensearch.sql.legacy.query.planner.physical.node.SearchHitRow; /** OpenSearch Search API with Point in time as physical implementation of TableScan */ public class PointInTime extends BatchPhysicalOperator { diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/SearchHitRow.java b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/SearchHitRow.java deleted file mode 100644 index ba28b26e69..0000000000 --- a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/SearchHitRow.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.legacy.query.planner.physical.node.pointInTime; - -import com.google.common.base.Strings; -import java.util.HashMap; -import java.util.Map; -import org.opensearch.common.document.DocumentField; -import org.opensearch.index.mapper.MapperService; -import org.opensearch.search.SearchHit; -import org.opensearch.sql.legacy.query.planner.physical.Row; - -/** - * - * - *
- * Search hit row that implements basic accessor for SearchHit.
- * Encapsulate all OpenSearch specific knowledge: how to parse source including nested path.
- * 

- * State transition: - * for example, SELECT e.name.first AS firstName, e.age AS age FROM E e JOIN D d ON ... ORDER BY ... - *

- * Stage | hit.source | tableAlias | Passed in args - * ---------------------------------------------------------------------------------------------------------------------- - * new in Scroll | {"name":{"first": "Allen", "last": "Hank"}, "age": 30} | "e"| new(SearchHit, "e") - * ---------------------------------------------------------------------------------------------------------------------- - * key()/combine() | | | key("name.first", "age") - * in JoinAlgorithm | {"e.name": {...}, "e.age": 30, "d..." } (after combined) | "" | combine(row of D) - * ---------------------------------------------------------------------------------------------------------------------- - * key() in XXSort | same | "" | key("e.name.first", "e.age") - * ---------------------------------------------------------------------------------------------------------------------- - * retain() in Project | {"firstName": "Allen", "age": 30 } | "" | retain("e.name.first", "e.age") - * ---------------------------------------------------------------------------------------------------------------------- - *

- */ -class SearchHitRow implements Row { - - /** Native OpenSearch data object for each row */ - private final SearchHit hit; - - /** Column and value pairs */ - private final Map source; - - /** Table alias owned the row. Empty if this row comes from combination of two other rows */ - private final String tableAlias; - - SearchHitRow(SearchHit hit, String tableAlias) { - this.hit = hit; - this.source = hit.getSourceAsMap(); - this.tableAlias = tableAlias; - } - - @Override - public RowKey key(String[] colNames) { - if (colNames.length == 0) { - return RowKey.NULL; - } - - Object[] keys = new Object[colNames.length]; - for (int i = 0; i < colNames.length; i++) { - keys[i] = getValueOfPath(colNames[i]); - - if (keys[i] == null) { - return RowKey.NULL; - } - } - return new RowKey(keys); - } - - /** - * Replace column name by full name to avoid naming conflicts. For efficiency, this only happens - * here when matched rows found. Create a new one to avoid mutating the original ones in hash - * table which impact subsequent match. - */ - @Override - public Row combine(Row other) { - SearchHit combined = cloneHit(other); - - collectFullName(combined.getSourceAsMap(), this); - if (other != NULL) { - collectFullName(combined.getSourceAsMap(), (SearchHitRow) other); - } - return new SearchHitRow(combined, ""); - } - - @Override - public void retain(Map colNameAlias) { - Map aliasSource = new HashMap<>(); - colNameAlias.forEach( - (colName, alias) -> { - if (colName.endsWith(".*")) { - String tableAlias = colName.substring(0, colName.length() - 2) + "."; - retainAllFieldsFromTable(aliasSource, tableAlias); - } else { - retainOneField(aliasSource, colName, alias); - } - }); - resetSource(aliasSource); - } - - @Override - public SearchHit data() { - return hit; - } - - @Override - public String toString() { - return "SearchHitRow{" + "hit=" + source + '}'; - } - - private Object getValueOfPath(String path) { - /* - * If table alias is missing which means the row was generated by combine(). - * In this case, table alias is present and the first dot should be ignored, ex. "e.name.first" - */ - return getValueOfPath(source, path, Strings.isNullOrEmpty(tableAlias)); - } - - /** Recursively get value for field name path, such as object field a.b.c */ - private Object getValueOfPath(Object source, String path, boolean isIgnoreFirstDot) { - if (!(source instanceof Map) || path.isEmpty()) { - return source; - } - - int dot = path.indexOf('.', (isIgnoreFirstDot ? path.indexOf('.') + 1 : 0)); - if (dot == -1) { - return ((Map) source).get(path); - } - - // Object field name maybe unexpanded without recursive object structure - // ex. {"a.b.c": value} instead of {"a": {"b": {"c": value}}}} - if (((Map) source).containsKey(path)) { - return ((Map) source).get(path); - } - - return getValueOfPath( - ((Map) source).get(path.substring(0, dot)), path.substring(dot + 1), false); - } - - private SearchHit cloneHit(Row other) { - Map documentFields = new HashMap<>(); - Map metaFields = new HashMap<>(); - hit.getFields() - .forEach( - (fieldName, docField) -> - (MapperService.META_FIELDS_BEFORE_7DOT8.contains(fieldName) - ? metaFields - : documentFields) - .put(fieldName, docField)); - SearchHit combined = - new SearchHit( - hit.docId(), - hit.getId() + "|" + (other == NULL ? "0" : ((SearchHitRow) other).hit.getId()), - documentFields, - metaFields); - combined.sourceRef(hit.getSourceRef()); - combined.getSourceAsMap().clear(); - return combined; - } - - private void collectFullName(Map newSource, SearchHitRow row) { - row.source.forEach((colName, value) -> newSource.put(row.tableAlias + "." + colName, value)); - } - - private void retainAllFieldsFromTable(Map aliasSource, String tableAlias) { - source.entrySet().stream() - .filter(e -> e.getKey().startsWith(tableAlias)) - .forEach(e -> aliasSource.put(e.getKey(), e.getValue())); - } - - /** - * Note that column here is already prefixed by table alias after combine(). - * - *

Meanwhile check if column name with table alias prefix, ex. a.name, is property, namely - * a.name.lastname. In this case, split by first second dot and continue searching for the final - * value in nested map by getValueOfPath(source.get("a.name"), "lastname") - */ - private void retainOneField(Map aliasSource, String colName, String alias) { - aliasSource.put(Strings.isNullOrEmpty(alias) ? colName : alias, getValueOfPath(colName)); - } - - private void resetSource(Map newSource) { - source.clear(); - source.putAll(newSource); - } -} diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/Scroll.java b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/Scroll.java index 40e9860886..ea661109bd 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/Scroll.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/Scroll.java @@ -29,6 +29,7 @@ import org.opensearch.sql.legacy.query.planner.physical.estimation.Cost; import org.opensearch.sql.legacy.query.planner.physical.node.BatchPhysicalOperator; import org.opensearch.sql.legacy.query.planner.resource.ResourceManager; +import org.opensearch.sql.legacy.query.planner.physical.node.SearchHitRow; /** OpenSearch Scroll API as physical implementation of TableScan */ public class Scroll extends BatchPhysicalOperator { diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/SearchHitRowTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/SearchHitRowTest.java index dd0fc626c0..f7d2030b0c 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/SearchHitRowTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/query/planner/physical/node/scroll/SearchHitRowTest.java @@ -12,6 +12,7 @@ import org.junit.Test; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.search.SearchHit; +import org.opensearch.sql.legacy.query.planner.physical.node.SearchHitRow; public class SearchHitRowTest { diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/planner/QueryPlannerBatchTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/planner/QueryPlannerBatchTest.java index 0c77550a2f..0862540b0b 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/planner/QueryPlannerBatchTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/planner/QueryPlannerBatchTest.java @@ -120,12 +120,12 @@ public static Collection data() { @Test public void departmentInnerJoinEmployee() { assertThat( - query( + query( String.format( TEST_SQL1 + TEST_SQL2_JOIN1 + TEST_SQL3, blockSize, pageSize, "INNER JOIN"), departments(pageSize, departments), employees(pageSize, employees)), - expectedInnerJoinResult); + expectedInnerJoinResult); } @Test diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/planner/QueryPlannerTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/planner/QueryPlannerTest.java index 521b225893..3a872d00a0 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/planner/QueryPlannerTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/planner/QueryPlannerTest.java @@ -7,11 +7,7 @@ import static java.util.Collections.emptyList; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import com.alibaba.druid.sql.ast.SQLExpr; import com.alibaba.druid.sql.ast.expr.SQLQueryExpr; @@ -20,6 +16,8 @@ import com.alibaba.druid.sql.parser.Token; import java.util.Arrays; import java.util.List; + +import lombok.SneakyThrows; import org.apache.lucene.search.TotalHits; import org.apache.lucene.search.TotalHits.Relation; import org.junit.Before; @@ -30,24 +28,28 @@ import org.mockito.MockitoAnnotations; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.opensearch.action.search.ClearScrollRequestBuilder; -import org.opensearch.action.search.ClearScrollResponse; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.action.search.SearchScrollRequestBuilder; +import org.opensearch.action.search.*; import org.opensearch.client.Client; import org.opensearch.cluster.ClusterName; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.search.DocValueFormat; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; +import org.opensearch.search.SearchSortValues; +import org.opensearch.search.builder.PointInTimeBuilder; +import org.opensearch.search.sort.FieldSortBuilder; +import org.opensearch.search.sort.SortOrder; import org.opensearch.sql.legacy.domain.JoinSelect; import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.exception.SqlParseException; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.legacy.parser.ElasticSqlExprParser; import org.opensearch.sql.legacy.parser.SqlParser; +import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl; import org.opensearch.sql.legacy.query.QueryAction; import org.opensearch.sql.legacy.query.SqlElasticRequestBuilder; import org.opensearch.sql.legacy.query.join.BackOffRetryStrategy; @@ -69,6 +71,8 @@ public abstract class QueryPlannerTest { @Mock private SearchResponse response2; private static final String SCROLL_ID2 = "2"; + private static final String PIT_ID1 = "1"; + @Mock private ClusterSettings clusterSettings; /* @@ -110,6 +114,7 @@ public void init() { when(client.execute(any(), any())).thenReturn(mockFuture); // Differentiate response for Scroll-1/2 by call count and scroll ID. + when(mockFuture.actionGet()) .thenAnswer( new Answer() { @@ -125,6 +130,7 @@ public SearchResponse answer(InvocationOnMock invocation) { } }); + doReturn(SCROLL_ID1).when(response1).getScrollId(); doReturn(SCROLL_ID2).when(response2).getScrollId(); @@ -138,6 +144,58 @@ public SearchResponse answer(InvocationOnMock invocation) { returnMockResponse(SCROLL_ID2, response2); Metrics.getInstance().registerDefaultMetrics(); + + initPIT(); + } + + @SneakyThrows + public void initPIT() { + + // Mock PointInTimeHandler + PointInTimeHandlerImpl pitHandler = mock(PointInTimeHandlerImpl.class); + doNothing().when(pitHandler).create(); + doNothing().when(pitHandler).delete(); + doReturn(PIT_ID1).when(pitHandler).getPitId(); + + // Mock SearchResponse for PointInTime + ActionFuture mockCreateFuture = mock(ActionFuture.class); + when(client.execute(any(), any(CreatePitRequest.class))).thenReturn(mockCreateFuture); + CreatePitResponse mockPitResponse = mock(CreatePitResponse.class); + + ActionFuture mockCDeleteFuture = mock(ActionFuture.class); + when(client.execute(any(), any(DeletePitRequest.class))).thenReturn(mockCDeleteFuture); + DeletePitResponse mockDeletePitResponse = mock(DeletePitResponse.class); + when(mockDeletePitResponse.status()).thenReturn(RestStatus.ACCEPTED); + when(mockCreateFuture.get()).thenReturn(mockPitResponse); + when(mockCDeleteFuture.get()).thenReturn(mockDeletePitResponse); + when(mockPitResponse.getId()).thenReturn(PIT_ID1); + + + // Set up SearchResponse mocks + // doReturn(searchHits).when(response2).getHits(); + doReturn(PIT_ID1).when(response1).pointInTimeId(); + doReturn(PIT_ID1).when(response2).pointInTimeId(); + doReturn(0).when(response1).getFailedShards(); + doReturn(0).when(response2).getFailedShards(); + doReturn(false).when(response1).isTimedOut(); + doReturn(false).when(response2).isTimedOut(); + + returnMockPITResponse(PIT_ID1, response1); + returnMockPITResponse(PIT_ID1, response2); + + Metrics.getInstance().registerDefaultMetrics(); + } + + private void returnMockPITResponse(String pitId, SearchResponse response) { + // Mock the request builder and SearchResponse + SearchRequestBuilder mockReqBuilder = mock(SearchRequestBuilder.class); + when(mockReqBuilder.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)).thenReturn(mockReqBuilder); + when(mockReqBuilder.setSize(5)).thenReturn(mockReqBuilder); + when(mockReqBuilder.setScroll(any(TimeValue.class))).thenReturn(mockReqBuilder); + when(mockReqBuilder.setPointInTime(new PointInTimeBuilder(pitId))).thenReturn(mockReqBuilder); + when(mockReqBuilder.searchAfter(new Object[] { "sortValue" })).thenReturn(mockReqBuilder); + + when(mockReqBuilder.get()).thenReturn(response); } private void returnMockResponse(String scrollId, SearchResponse response) { @@ -279,6 +337,7 @@ protected SearchHit employee(int docId, String lastname, String departmentId) { new BytesArray( "{\"lastname\":\"" + lastname + "\",\"departmentId\":\"" + departmentId + "\"}")); } + hit.sortValues(new SearchSortValues(new String[] { "sample" }, new DocValueFormat[] { DocValueFormat.RAW} )); return hit; } @@ -291,6 +350,7 @@ protected SearchHit department(int docId, String id, String name) { } else { hit.sourceRef(new BytesArray("{\"id\":\"" + id + "\",\"name\":\"" + name + "\"}")); } + hit.sortValues(new SearchSortValues(new String[] { "sample" }, new DocValueFormat[] { DocValueFormat.RAW} )); return hit; } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java index 0a9cc67993..edd387eefa 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java @@ -7,6 +7,9 @@ import java.util.List; import java.util.Map; + +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.DeletePitRequest; import org.opensearch.client.node.NodeClient; import org.opensearch.sql.opensearch.mapping.IndexMapping; import org.opensearch.sql.opensearch.request.OpenSearchRequest; @@ -89,4 +92,20 @@ public interface OpenSearchClient { void schedule(Runnable task); NodeClient getNodeClient(); + + /** + * Create PIT for given indices + * + * @param createPitRequest Create Point In Time request + * + * @return PitId + */ + String createPit(CreatePitRequest createPitRequest); + + /** + * Delete PIT + * + * @param deletePitRequest Delete Point In Time request + */ + void deletePit(DeletePitRequest deletePitRequest); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java index 42a0af13c2..b349fb839d 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java @@ -7,6 +7,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; + import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -17,8 +18,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import org.opensearch.action.ActionRequest; -import org.opensearch.action.ActionType; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse; @@ -26,12 +25,10 @@ import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; import org.opensearch.action.search.*; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.metadata.AliasMetadata; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.settings.Settings; -import org.opensearch.core.action.ActionResponse; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexSettings; import org.opensearch.sql.opensearch.mapping.IndexMapping; @@ -165,7 +162,8 @@ public List indices() { */ @Override public Map meta() { - return ImmutableMap.of(META_CLUSTER_NAME, client.settings().get("cluster.name", "opensearch")); + return ImmutableMap.of(META_CLUSTER_NAME, client.settings().get("cluster.name", "opensearch"), + "plugins.sql.pagination.api", client.settings().get("plugins.sql.pagination.api", "true")); } @Override @@ -195,4 +193,27 @@ public void schedule(Runnable task) { public NodeClient getNodeClient() { return client; } + + @Override + public String createPit(CreatePitRequest createPitRequest) { + ActionFuture execute = + this.client.execute(CreatePitAction.INSTANCE, createPitRequest); + try { + CreatePitResponse pitResponse = execute.get(); + return pitResponse.getId(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Error occurred while creating PIT for new engine SQL query", e); + } + } + + @Override + public void deletePit(DeletePitRequest deletePitRequest) { + ActionFuture execute = + this.client.execute(DeletePitAction.INSTANCE, deletePitRequest); + try { + DeletePitResponse deletePitResponse = execute.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Error occurred while deleting PIT.", e); + } + } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java index dc202edb8d..34e30a5d37 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java @@ -19,7 +19,7 @@ import org.opensearch.action.admin.cluster.settings.ClusterGetSettingsRequest; import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest; import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; -import org.opensearch.action.search.ClearScrollRequest; +import org.opensearch.action.search.*; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.indices.CreateIndexRequest; @@ -168,6 +168,7 @@ public Map meta() { final Settings defaultSettings = client.cluster().getSettings(request, RequestOptions.DEFAULT).getDefaultSettings(); builder.put(META_CLUSTER_NAME, defaultSettings.get("cluster.name", "opensearch")); + builder.put("plugins.sql.pagination.api", defaultSettings.get("plugins.sql.pagination.api", "true")); return builder.build(); } catch (IOException e) { throw new IllegalStateException("Failed to get cluster meta info", e); @@ -202,4 +203,23 @@ public void schedule(Runnable task) { public NodeClient getNodeClient() { throw new UnsupportedOperationException("Unsupported method."); } + + @Override + public String createPit(CreatePitRequest createPitRequest) { + try { + CreatePitResponse createPitResponse = client.createPit(createPitRequest, RequestOptions.DEFAULT); + return createPitResponse.getId(); + } catch (IOException e) { + throw new RuntimeException("Error occurred while creating PIT for new engine SQL query", e); + } + } + + @Override + public void deletePit(DeletePitRequest deletePitRequest) { + try { + DeletePitResponse deletePitResponse = client.deletePit(deletePitRequest, RequestOptions.DEFAULT); + } catch (IOException e) { + throw new RuntimeException("Error occurred while creating PIT for new engine SQL query", e); + } + } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java index 643e02994a..0a0e62eb98 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java @@ -8,14 +8,12 @@ import java.io.IOException; import java.util.Collections; import java.util.List; -import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import java.util.function.Function; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.ToString; import org.opensearch.action.search.*; -import org.opensearch.common.action.ActionFuture; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentType; @@ -120,35 +118,48 @@ public OpenSearchQueryRequest( this.includes = includes; this.cursorKeepAlive = cursorKeepAlive; this.client = client; + this.pitId = createPIT(); } @Override public OpenSearchResponse search( - Function searchAction, - Function scrollAction) { + Function searchAction, + Function scrollAction) { + if (this.pitId == null) { + // When SearchRequest doesn't contain PitId, fetch single page request + if (searchDone) { + return new OpenSearchResponse(SearchHits.empty(), exprValueFactory, includes); + } else { + searchDone = true; + return new OpenSearchResponse( + searchAction.apply( + new SearchRequest().indices(indexName.getIndexNames()).source(sourceBuilder)), + exprValueFactory, + includes); + } + } else { + // Search with PIT instead of scroll API + return searchWithPIT(searchAction); + } + } + + public OpenSearchResponse searchWithPIT( + Function searchAction) { OpenSearchResponse openSearchResponse; if (searchDone) { openSearchResponse = new OpenSearchResponse(SearchHits.empty(), exprValueFactory, includes); } else { - searchDone = true; - - // Create PIT and Set PIT - if (this.pitId == null) { - this.pitId = createPIT(); - } this.sourceBuilder.pointInTimeBuilder(new PointInTimeBuilder(this.pitId)); this.sourceBuilder.timeout(cursorKeepAlive); - // check for search after if (searchAfter != null) { - this.sourceBuilder.searchAfter(searchAfter); // Set search_after in the query + this.sourceBuilder.searchAfter(searchAfter); } - // Set sort field for search_after if (this.sourceBuilder.sorts() == null) { this.sourceBuilder.sort(DOC_FIELD_NAME, ASC); } - SearchRequest searchRequest = new SearchRequest().indices(indexName.getIndexNames()).source(this.sourceBuilder); + SearchRequest searchRequest = new SearchRequest().source(this.sourceBuilder); this.searchResponse = searchAction.apply(searchRequest); openSearchResponse = new OpenSearchResponse( @@ -157,69 +168,63 @@ public OpenSearchResponse search( includes); needClean = openSearchResponse.isEmpty(); - if (!needClean && this.searchResponse.getHits().getHits() != null) { + searchDone = openSearchResponse.isEmpty(); + if (!needClean && this.searchResponse.getHits().getHits() != null && this.searchResponse.getHits().getHits().length > 0) { searchAfter = this.searchResponse.getHits().getHits()[this.searchResponse.getHits().getHits().length - 1].getSortValues(); this.sourceBuilder.searchAfter(searchAfter); - - SearchResponse nextBatchResponse = searchAction.apply(new SearchRequest().indices(indexName.getIndexNames()).source(this.sourceBuilder)); - needClean = nextBatchResponse.getHits().getHits().length == 0; - } } return openSearchResponse; } public String createPIT() { - CreatePitRequest createPitRequest = new CreatePitRequest(this.cursorKeepAlive, false, indexName.getIndexNames()); - ActionFuture execute = - this.client.getNodeClient().execute(CreatePitAction.INSTANCE, createPitRequest); - try { - CreatePitResponse pitResponse = execute.get(); - return pitResponse.getId(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("Error occurred while creating PIT for new engine SQL query", e); - } + return this.client.createPit(createPitRequest); } public void deletePit() { DeletePitRequest deletePitRequest = new DeletePitRequest(this.pitId); - ActionFuture execute = - this.client.getNodeClient().execute(DeletePitAction.INSTANCE, deletePitRequest); - try { - DeletePitResponse deletePitResponse = execute.get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("Error occurred while deleting PIT.", e); - } + this.client.deletePit(deletePitRequest); + searchDone = true; + this.pitId = null; } @Override public void clean(Consumer cleanAction) { - if (needClean) { + if (needClean && this.pitId != null) { deletePit(); } } @Override public boolean hasAnotherBatch() { - return !needClean; + if (this.pitId != null) { + return !needClean; + } + return false; } @Override public void writeTo(StreamOutput out) throws IOException { - // Convert SearchSourceBuilder to XContent and write it as a string - out.writeString(sourceBuilder.toString()); - - out.writeTimeValue(sourceBuilder.timeout()); - out.writeString(sourceBuilder.pointInTimeBuilder().getId()); - out.writeStringCollection(includes); - indexName.writeTo(out); - - // Serialize the searchAfter array - out.writeVInt(searchAfter.length); - for (Object obj : searchAfter) { - out.writeGenericValue(obj); + if (this.pitId != null) { + // Convert SearchSourceBuilder to XContent and write it as a string + out.writeString(sourceBuilder.toString()); + + out.writeTimeValue(sourceBuilder.timeout()); + out.writeString(sourceBuilder.pointInTimeBuilder().getId()); + out.writeStringCollection(includes); + indexName.writeTo(out); + + // Serialize the searchAfter array + out.writeVInt(searchAfter.length); + for (Object obj : searchAfter) { + out.writeGenericValue(obj); + } + } else { + // OpenSearch Query request without PIT for single page requests + throw new UnsupportedOperationException( + "OpenSearchQueryRequest serialization is not implemented."); } } @@ -252,7 +257,6 @@ public OpenSearchQueryRequest(StreamInput in, OpenSearchStorageEngine engine) this.client = engine.getClient(); int length = in.readVInt(); - // Read each element of the searchAfter array this.searchAfter = new Object[length]; for (int i = 0; i < length; i++) { this.searchAfter[i] = in.readGenericValue(); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java index 037f87c0d2..65b40af4d1 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java @@ -11,8 +11,6 @@ import static org.opensearch.index.query.QueryBuilders.nestedQuery; import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME; import static org.opensearch.search.sort.SortOrder.ASC; -import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE; -import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; import java.util.ArrayList; import java.util.Arrays; @@ -20,7 +18,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -29,8 +26,6 @@ import lombok.ToString; import org.apache.commons.lang3.tuple.Pair; import org.apache.lucene.search.join.ScoreMode; -import org.opensearch.action.search.*; -import org.opensearch.common.action.ActionFuture; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.InnerHitBuilder; @@ -38,8 +33,6 @@ import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.aggregations.AggregationBuilder; -import org.opensearch.search.builder.PointInTimeBuilder; -import org.opensearch.client.Client; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.fetch.subphase.FetchSourceContext; import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder; @@ -78,8 +71,6 @@ public class OpenSearchRequestBuilder { private final Settings settings; - private String[] indices; - /** Constructor. */ public OpenSearchRequestBuilder( int requestedTotalSize, OpenSearchExprValueFactory exprValueFactory, Settings settings) { @@ -96,7 +87,7 @@ public OpenSearchRequestBuilder( /** * Build DSL request. * - * @return query request or scroll request + * @return query request with PIT or scroll request */ public OpenSearchRequest build( OpenSearchRequest.IndexName indexName, int maxResultWindow, TimeValue cursorKeepAlive, OpenSearchClient client) { @@ -107,19 +98,22 @@ public OpenSearchRequest build( if (pageSize == null) { if (startFrom + size > maxResultWindow) { sourceBuilder.size(maxResultWindow - startFrom); + // Search with PIT request + return new OpenSearchQueryRequest(indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, client); } else { + sourceBuilder.from(startFrom); sourceBuilder.size(requestedTotalSize); + // Search with non-Pit request + return new OpenSearchQueryRequest(indexName, sourceBuilder, exprValueFactory, includes); } } else { if (startFrom != 0) { throw new UnsupportedOperationException("Non-zero offset is not supported with pagination"); } sourceBuilder.size(pageSize); + // Search with PIT request instead of Scroll + return new OpenSearchQueryRequest(indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, client); } - // Set sort field for search_after - sourceBuilder.sort(DOC_FIELD_NAME, ASC); - return new OpenSearchQueryRequest(indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, client); - } else { int size = requestedTotalSize; FetchSourceContext fetchSource = this.sourceBuilder.fetchSource(); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java index 73b6a5d08d..11acede31f 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java @@ -126,11 +126,9 @@ public void readExternal(ObjectInput in) throws IOException { ((PlanSerializer.CursorDeserializationStream) in).resolveObject("engine"); client = engine.getClient(); - - boolean paginationAPISetting = Boolean.parseBoolean(client.getNodeClient().settings().get(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER.getKeyValue(), String.valueOf(true))); - System.out.println("**********OPENSEARCHINDEXSCAN flag: " + paginationAPISetting); + boolean pointInTimeEnabled = Boolean.parseBoolean(client.meta().get(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER.getKeyValue())); try (BytesStreamInput bsi = new BytesStreamInput(requestStream)) { - if (paginationAPISetting) { + if (pointInTimeEnabled) { request = new OpenSearchQueryRequest(bsi, engine); } else { request = new OpenSearchScrollRequest(bsi, engine); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java index 739b70b1b8..c072a97988 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java @@ -179,13 +179,13 @@ void explain_successfully() { final var name = new OpenSearchRequest.IndexName("test"); final int defaultQuerySize = 100; final int maxResultWindow = 10000; - final var requestBuilder = new OpenSearchRequestBuilder(defaultQuerySize, exprValueFactory); + final var requestBuilder = new OpenSearchRequestBuilder(defaultQuerySize, exprValueFactory, settings); PhysicalPlan plan = new OpenSearchIndexScan( mock(OpenSearchClient.class), maxResultWindow, requestBuilder.build( - name, maxResultWindow, settings.getSettingValue(SQL_CURSOR_KEEP_ALIVE))); + name, maxResultWindow, settings.getSettingValue(SQL_CURSOR_KEEP_ALIVE), client)); AtomicReference result = new AtomicReference<>(); executor.explain( diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java index 5cd11c6cd4..a1d6271a0c 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java @@ -8,9 +8,7 @@ import static java.util.Collections.emptyList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; import static org.opensearch.sql.ast.tree.Sort.SortOption.DEFAULT_ASC; import static org.opensearch.sql.data.type.ExprCoreType.DOUBLE; import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; @@ -91,6 +89,8 @@ public void setup() { @Test void test_protect_indexScan() { + when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)).thenReturn(true); + String indexName = "test"; final int maxResultWindow = 10000; final int querySizeLimit = 200; @@ -114,11 +114,11 @@ void test_protect_indexScan() { final var name = new OpenSearchRequest.IndexName(indexName); final var request = - new OpenSearchRequestBuilder(querySizeLimit, exprValueFactory) + new OpenSearchRequestBuilder(querySizeLimit, exprValueFactory, settings) .build( name, maxResultWindow, - settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE)); + settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE), client); assertEquals( PhysicalPlanDSL.project( PhysicalPlanDSL.limit( diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java index 742e76cbd0..46255554fb 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java @@ -7,9 +7,7 @@ import static org.junit.Assert.assertThrows; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.opensearch.index.query.QueryBuilders.matchAllQuery; import static org.opensearch.index.query.QueryBuilders.nestedQuery; import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME; @@ -50,10 +48,12 @@ import org.opensearch.search.sort.FieldSortBuilder; import org.opensearch.search.sort.ScoreSortBuilder; import org.opensearch.search.sort.SortBuilders; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.exception.SemanticCheckException; import org.opensearch.sql.expression.DSL; import org.opensearch.sql.expression.NamedExpression; import org.opensearch.sql.expression.ReferenceExpression; +import org.opensearch.sql.opensearch.client.OpenSearchClient; import org.opensearch.sql.opensearch.data.type.OpenSearchDataType; import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; import org.opensearch.sql.opensearch.response.agg.CompositeAggregationParser; @@ -76,11 +76,16 @@ class OpenSearchRequestBuilderTest { @Mock private OpenSearchExprValueFactory exprValueFactory; + @Mock private OpenSearchClient client; + + @Mock private Settings settings; + private OpenSearchRequestBuilder requestBuilder; @BeforeEach void setup() { - requestBuilder = new OpenSearchRequestBuilder(DEFAULT_LIMIT, exprValueFactory); + requestBuilder = new OpenSearchRequestBuilder(DEFAULT_LIMIT, exprValueFactory, settings); + lenient().when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)).thenReturn(true); } @Test @@ -100,11 +105,12 @@ void build_query_request() { .trackScores(true), exprValueFactory, List.of()), - requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT)); + requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT, client)); } @Test void build_scroll_request_with_correct_size() { + when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)).thenReturn(false); Integer limit = 800; Integer offset = 10; requestBuilder.pushDownLimit(limit, offset); @@ -119,7 +125,7 @@ void build_scroll_request_with_correct_size() { .timeout(DEFAULT_QUERY_TIMEOUT), exprValueFactory, List.of()), - requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT)); + requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT, client)); } @Test @@ -127,7 +133,7 @@ void test_push_down_query() { QueryBuilder query = QueryBuilders.termQuery("intA", 1); requestBuilder.pushDownFilter(query); - var r = requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT); + var r = requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT, client); Function querySearch = searchRequest -> { assertEquals( @@ -220,7 +226,7 @@ void assertSearchSourceBuilder( throw new UnsupportedOperationException(); }; requestBuilder - .build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT) + .build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT, client) .search(querySearch, scrollSearch); } @@ -290,7 +296,7 @@ void test_push_down_project() { .fetchSource("intA", null), exprValueFactory, List.of("intA")), - requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT)); + requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT, client)); } @Test @@ -320,7 +326,7 @@ void test_push_down_project_limit() { .fetchSource("intA", null), exprValueFactory, List.of("intA")), - requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT)); + requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT, client)); } @Test @@ -350,7 +356,7 @@ void test_push_down_project_limit_and_offset() { .fetchSource("intA", null), exprValueFactory, List.of("intA")), - requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT)); + requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT, client)); } @Test @@ -521,7 +527,7 @@ void exception_when_non_zero_offset_and_page_size() { requestBuilder.pushDownLimit(300, 2); assertThrows( UnsupportedOperationException.class, - () -> requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT)); + () -> requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT, client)); } @Test diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java index 3ca566fac6..f2e395d323 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java @@ -79,6 +79,7 @@ class OpenSearchIndexTest { @BeforeEach void setUp() { this.index = new OpenSearchIndex(client, settings, "test"); + lenient().when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)).thenReturn(true); } @Test @@ -198,10 +199,10 @@ void implementRelationOperatorOnly() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); LogicalPlan plan = index.createScanBuilder(); Integer maxResultWindow = index.getMaxResultWindow(); - final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE_LIMIT, exprValueFactory); + final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE_LIMIT, exprValueFactory, settings); assertEquals( new OpenSearchIndexScan( - client, 200, requestBuilder.build(INDEX_NAME, maxResultWindow, SCROLL_TIMEOUT)), + client, 200, requestBuilder.build(INDEX_NAME, maxResultWindow, SCROLL_TIMEOUT, client)), index.implement(index.optimize(plan))); } @@ -211,10 +212,10 @@ void implementRelationOperatorWithOptimization() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); LogicalPlan plan = index.createScanBuilder(); Integer maxResultWindow = index.getMaxResultWindow(); - final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE_LIMIT, exprValueFactory); + final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE_LIMIT, exprValueFactory, settings); assertEquals( new OpenSearchIndexScan( - client, 200, requestBuilder.build(INDEX_NAME, maxResultWindow, SCROLL_TIMEOUT)), + client, 200, requestBuilder.build(INDEX_NAME, maxResultWindow, SCROLL_TIMEOUT, client)), index.implement(plan)); } @@ -243,7 +244,7 @@ void implementOtherLogicalOperators() { include); Integer maxResultWindow = index.getMaxResultWindow(); - final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE_LIMIT, exprValueFactory); + final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE_LIMIT, exprValueFactory, settings); assertEquals( PhysicalPlanDSL.project( PhysicalPlanDSL.dedupe( @@ -255,7 +256,7 @@ void implementOtherLogicalOperators() { client, QUERY_SIZE_LIMIT, requestBuilder.build( - INDEX_NAME, maxResultWindow, SCROLL_TIMEOUT)), + INDEX_NAME, maxResultWindow, SCROLL_TIMEOUT, client)), mappings), exclude), newEvalField), diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java index 2085519b12..5685465efd 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java @@ -56,6 +56,7 @@ void setup() { lenient() .when(settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE)) .thenReturn(TimeValue.timeValueMinutes(1)); + lenient().when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)).thenReturn(true); } @Mock private OpenSearchClient client; @@ -69,12 +70,12 @@ void setup() { @Test void query_empty_result() { mockResponse(client); - var builder = new OpenSearchRequestBuilder(QUERY_SIZE, exprValueFactory); + var builder = new OpenSearchRequestBuilder(QUERY_SIZE, exprValueFactory, settings); try (var indexScan = new OpenSearchIndexScan( client, MAX_RESULT_WINDOW, - builder.build(INDEX_NAME, MAX_RESULT_WINDOW, SCROLL_TIMEOUT))) { + builder.build(INDEX_NAME, MAX_RESULT_WINDOW, SCROLL_TIMEOUT, client))) { indexScan.open(); assertFalse(indexScan.hasNext()); } @@ -96,13 +97,13 @@ void dont_serialize_if_no_cursor() { OpenSearchRequestBuilder builder = mock(); OpenSearchRequest request = mock(); OpenSearchResponse response = mock(); - when(builder.build(any(), anyInt(), any())).thenReturn(request); + when(builder.build(any(), anyInt(), any(), any())).thenReturn(request); when(client.search(any())).thenReturn(response); try (var indexScan = new OpenSearchIndexScan( client, MAX_RESULT_WINDOW, - builder.build(INDEX_NAME, MAX_RESULT_WINDOW, SCROLL_TIMEOUT))) { + builder.build(INDEX_NAME, MAX_RESULT_WINDOW, SCROLL_TIMEOUT, client))) { indexScan.open(); when(request.hasAnotherBatch()).thenReturn(false); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java index f813d8f551..5708ea7e3e 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java @@ -11,9 +11,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME; import static org.opensearch.search.sort.SortOrder.ASC; import static org.opensearch.sql.data.type.ExprCoreType.STRING; @@ -52,6 +50,7 @@ import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder; import org.opensearch.sql.ast.expression.DataType; import org.opensearch.sql.ast.expression.Literal; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.model.ExprValueUtils; import org.opensearch.sql.exception.NoCursorException; @@ -77,6 +76,7 @@ class OpenSearchIndexScanTest { public static final int MAX_RESULT_WINDOW = 10000; public static final TimeValue CURSOR_KEEP_ALIVE = TimeValue.timeValueMinutes(1); @Mock private OpenSearchClient client; + @Mock private Settings settings; private final OpenSearchExprValueFactory exprValueFactory = new OpenSearchExprValueFactory( @@ -84,7 +84,9 @@ class OpenSearchIndexScanTest { "name", OpenSearchDataType.of(STRING), "department", OpenSearchDataType.of(STRING))); @BeforeEach - void setup() {} + void setup() { + lenient().when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)).thenReturn(true); + } @Test void explain() { @@ -172,10 +174,10 @@ void plan_for_serialization() { void query_empty_result() { mockResponse(client); final var name = new OpenSearchRequest.IndexName("test"); - final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE, exprValueFactory); + final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE, exprValueFactory, settings); try (OpenSearchIndexScan indexScan = new OpenSearchIndexScan( - client, QUERY_SIZE, requestBuilder.build(name, MAX_RESULT_WINDOW, CURSOR_KEEP_ALIVE))) { + client, QUERY_SIZE, requestBuilder.build(name, MAX_RESULT_WINDOW, CURSOR_KEEP_ALIVE, client))) { indexScan.open(); assertFalse(indexScan.hasNext()); } @@ -190,10 +192,10 @@ void query_all_results_with_query() { employee(1, "John", "IT"), employee(2, "Smith", "HR"), employee(3, "Allen", "IT") }); - final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE, exprValueFactory); + final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE, exprValueFactory, settings); try (OpenSearchIndexScan indexScan = new OpenSearchIndexScan( - client, 10, requestBuilder.build(INDEX_NAME, 10000, CURSOR_KEEP_ALIVE))) { + client, 10, requestBuilder.build(INDEX_NAME, 10000, CURSOR_KEEP_ALIVE, client))) { indexScan.open(); assertAll( @@ -218,10 +220,10 @@ void query_all_results_with_scroll() { new ExprValue[] {employee(1, "John", "IT"), employee(2, "Smith", "HR")}, new ExprValue[] {employee(3, "Allen", "IT")}); - final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE, exprValueFactory); + final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE, exprValueFactory, settings); try (OpenSearchIndexScan indexScan = new OpenSearchIndexScan( - client, 10, requestBuilder.build(INDEX_NAME, 10000, CURSOR_KEEP_ALIVE))) { + client, 10, requestBuilder.build(INDEX_NAME, 10000, CURSOR_KEEP_ALIVE, client))) { indexScan.open(); assertAll( @@ -248,10 +250,10 @@ void query_some_results_with_query() { }); final int limit = 3; - OpenSearchRequestBuilder builder = new OpenSearchRequestBuilder(0, exprValueFactory); + OpenSearchRequestBuilder builder = new OpenSearchRequestBuilder(0, exprValueFactory, settings); try (OpenSearchIndexScan indexScan = new OpenSearchIndexScan( - client, limit, builder.build(INDEX_NAME, MAX_RESULT_WINDOW, CURSOR_KEEP_ALIVE))) { + client, limit, builder.build(INDEX_NAME, MAX_RESULT_WINDOW, CURSOR_KEEP_ALIVE, client))) { indexScan.open(); assertAll( @@ -269,10 +271,10 @@ void query_some_results_with_query() { @Test void query_some_results_with_scroll() { mockTwoPageResponse(client); - final var requestuilder = new OpenSearchRequestBuilder(10, exprValueFactory); + final var requestuilder = new OpenSearchRequestBuilder(10, exprValueFactory, settings); try (OpenSearchIndexScan indexScan = new OpenSearchIndexScan( - client, 3, requestuilder.build(INDEX_NAME, MAX_RESULT_WINDOW, CURSOR_KEEP_ALIVE))) { + client, 3, requestuilder.build(INDEX_NAME, MAX_RESULT_WINDOW, CURSOR_KEEP_ALIVE, client))) { indexScan.open(); assertAll( @@ -306,12 +308,12 @@ void query_results_limited_by_query_size() { }); final int defaultQuerySize = 2; - final var requestBuilder = new OpenSearchRequestBuilder(defaultQuerySize, exprValueFactory); + final var requestBuilder = new OpenSearchRequestBuilder(defaultQuerySize, exprValueFactory, settings); try (OpenSearchIndexScan indexScan = new OpenSearchIndexScan( client, defaultQuerySize, - requestBuilder.build(INDEX_NAME, QUERY_SIZE, CURSOR_KEEP_ALIVE))) { + requestBuilder.build(INDEX_NAME, QUERY_SIZE, CURSOR_KEEP_ALIVE, client))) { indexScan.open(); assertAll( @@ -368,7 +370,7 @@ void push_down_highlight_with_arguments() { } private PushDownAssertion assertThat() { - return new PushDownAssertion(client, exprValueFactory); + return new PushDownAssertion(client, exprValueFactory, settings); } private static class PushDownAssertion { @@ -377,9 +379,9 @@ private static class PushDownAssertion { private final OpenSearchResponse response; private final OpenSearchExprValueFactory factory; - public PushDownAssertion(OpenSearchClient client, OpenSearchExprValueFactory valueFactory) { + public PushDownAssertion(OpenSearchClient client, OpenSearchExprValueFactory valueFactory, Settings settings) { this.client = client; - this.requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE, valueFactory); + this.requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE, valueFactory, settings); this.response = mock(OpenSearchResponse.class); this.factory = valueFactory; @@ -411,7 +413,7 @@ PushDownAssertion shouldQueryHighlight(QueryBuilder query, HighlightBuilder high when(client.search(request)).thenReturn(response); var indexScan = new OpenSearchIndexScan( - client, QUERY_SIZE, requestBuilder.build(EMPLOYEES_INDEX, 10000, CURSOR_KEEP_ALIVE)); + client, QUERY_SIZE, requestBuilder.build(EMPLOYEES_INDEX, 10000, CURSOR_KEEP_ALIVE, client)); indexScan.open(); return this; } @@ -429,7 +431,7 @@ PushDownAssertion shouldQuery(QueryBuilder expected) { when(client.search(request)).thenReturn(response); var indexScan = new OpenSearchIndexScan( - client, 10000, requestBuilder.build(EMPLOYEES_INDEX, 10000, CURSOR_KEEP_ALIVE)); + client, 10000, requestBuilder.build(EMPLOYEES_INDEX, 10000, CURSOR_KEEP_ALIVE, client)); indexScan.open(); return this; } diff --git a/plugin/build.gradle b/plugin/build.gradle index f70a429343..7ebd0ad2d9 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -1,5 +1,3 @@ -import java.util.concurrent.Callable - /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 @@ -116,11 +114,6 @@ configurations.all { resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.9.10" resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-common:1.9.10" } - -configurations { - zipArchive -} - compileJava { options.compilerArgs.addAll(["-processor", 'lombok.launch.AnnotationProcessorHider$AnnotationProcessor']) } @@ -169,8 +162,6 @@ dependencies { testImplementation group: 'org.mockito', name: 'mockito-core', version: "${versions.mockito}" testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: "${versions.mockito}" testImplementation 'org.junit.jupiter:junit-jupiter:5.9.3' - - zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${opensearch_build}" } test { @@ -285,24 +276,7 @@ afterEvaluate { } } -def getJobSchedulerPlugin() { - provider(new Callable() { - @Override - RegularFile call() throws Exception { - return new RegularFile() { - @Override - File getAsFile() { - return configurations.zipArchive.asFileTree.matching { - include '**/opensearch-job-scheduler*' - }.singleFile - } - } - } - }) -} - testClusters.integTest { - plugin(getJobSchedulerPlugin()) plugin(project.tasks.bundlePlugin.archiveFile) testDistribution = "ARCHIVE" @@ -317,4 +291,5 @@ testClusters.integTest { run { useCluster testClusters.integTest -} \ No newline at end of file +} +