Skip to content

Commit

Permalink
Add missing refactoring of Scroll to PIT API calls for Joins and Pagi…
Browse files Browse the repository at this point in the history
…nation query (opensearch-project#2981)

* Adding PIT for pagination queries in new SQL engine code paths

Signed-off-by: Manasvini B S <[email protected]>

* Fix legacy code using scroll API instead of PIT for batch physical operator

Signed-off-by: Manasvini B S <[email protected]>

* Fix local debugger issue

Signed-off-by: Manasvini B S <[email protected]>

* Refactor integ-tests data for PIT and fix unit tests

Signed-off-by: Manasvini B S <[email protected]>

* Address feedback comments

Signed-off-by: Manasvini B S <[email protected]>

* Adding test coverage

Signed-off-by: Manasvini B S <[email protected]>

---------

Signed-off-by: Manasvini B S <[email protected]>
manasvinibs authored Sep 23, 2024
1 parent a87893a commit ce17d0a
Showing 33 changed files with 1,470 additions and 302 deletions.
Original file line number Diff line number Diff line change
@@ -182,7 +182,7 @@ public void validTotalResultWithAndWithoutPaginationOrderBy() throws IOException
String selectQuery =
StringUtils.format(
"SELECT firstname, state FROM %s ORDER BY balance DESC ", TEST_INDEX_ACCOUNT);
verifyWithAndWithoutPaginationResponse(selectQuery + " LIMIT 2000", selectQuery, 26, false);
verifyWithAndWithoutPaginationResponse(selectQuery + " LIMIT 2000", selectQuery, 25, false);
}

@Test
Original file line number Diff line number Diff line change
@@ -149,8 +149,11 @@ public void onFailure(Exception e) {

private Settings defaultSettings() {
return new Settings() {
private final Map<Key, Integer> defaultSettings =
new ImmutableMap.Builder<Key, Integer>().put(Key.QUERY_SIZE_LIMIT, 200).build();
private final Map<Key, Object> defaultSettings =
new ImmutableMap.Builder<Key, Object>()
.put(Key.QUERY_SIZE_LIMIT, 200)
.put(Key.SQL_PAGINATION_API_SEARCH_AFTER, true)
.build();

@Override
public <T> T getSettingValue(Key key) {
Original file line number Diff line number Diff line change
@@ -34,25 +34,30 @@ public class PaginationFilterIT extends SQLIntegTestCase {
*/
private static final Map<String, Integer> STATEMENT_TO_NUM_OF_PAGES =
Map.of(
"SELECT * FROM " + TestsConstants.TEST_INDEX_ACCOUNT, 1000,
"SELECT * FROM " + TestsConstants.TEST_INDEX_ACCOUNT,
1000,
"SELECT * FROM " + TestsConstants.TEST_INDEX_ACCOUNT + " WHERE match(address, 'street')",
385,
385,
"SELECT * FROM "
+ TestsConstants.TEST_INDEX_ACCOUNT
+ " WHERE match(address, 'street') AND match(city, 'Ola')",
1,
+ TestsConstants.TEST_INDEX_ACCOUNT
+ " WHERE match(address, 'street') AND match(city, 'Ola')",
1,
"SELECT firstname, lastname, highlight(address) FROM "
+ TestsConstants.TEST_INDEX_ACCOUNT
+ " WHERE match(address, 'street') AND match(state, 'OH')",
5,
+ TestsConstants.TEST_INDEX_ACCOUNT
+ " WHERE match(address, 'street') AND match(state, 'OH')",
5,
"SELECT firstname, lastname, highlight('*') FROM "
+ TestsConstants.TEST_INDEX_ACCOUNT
+ " WHERE match(address, 'street') AND match(state, 'OH')",
5,
"SELECT * FROM " + TestsConstants.TEST_INDEX_BEER + " WHERE true", 60,
"SELECT * FROM " + TestsConstants.TEST_INDEX_BEER + " WHERE Id=10", 1,
"SELECT * FROM " + TestsConstants.TEST_INDEX_BEER + " WHERE Id + 5=15", 1,
"SELECT * FROM " + TestsConstants.TEST_INDEX_BANK, 7);
+ TestsConstants.TEST_INDEX_ACCOUNT
+ " WHERE match(address, 'street') AND match(state, 'OH')",
5,
"SELECT * FROM " + TestsConstants.TEST_INDEX_BEER + " WHERE true",
60,
"SELECT * FROM " + TestsConstants.TEST_INDEX_BEER + " WHERE Id=10",
1,
"SELECT * FROM " + TestsConstants.TEST_INDEX_BEER + " WHERE Id + 5=15",
1,
"SELECT * FROM " + TestsConstants.TEST_INDEX_BANK,
7);

private final String sqlStatement;

Original file line number Diff line number Diff line change
@@ -166,6 +166,7 @@ private Settings defaultSettings() {
new ImmutableMap.Builder<Key, Object>()
.put(Key.QUERY_SIZE_LIMIT, 200)
.put(Key.SQL_CURSOR_KEEP_ALIVE, TimeValue.timeValueMinutes(1))
.put(Key.SQL_PAGINATION_API_SEARCH_AFTER, true)
.build();

@Override
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone\u003dfalse)"
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)"
},
"children": []
}
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"bool\":{\"filter\":[{\"range\":{\"balance\":{\"from\":10000,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":null,\"to\":40,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone\u003dfalse)"
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"bool\":{\"filter\":[{\"range\":{\"balance\":{\"from\":10000,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":null,\"to\":40,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)"
},
"children": []
}
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\"}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\"}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)"
},
"children": []
}
Original file line number Diff line number Diff line change
@@ -31,7 +31,7 @@
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone\u003dfalse)"
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)"
},
"children": []
}
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone\u003dfalse)"
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, needClean\u003dtrue, searchDone\u003dfalse, pitId\u003dnull, cursorKeepAlive\u003dnull, searchAfter\u003dnull, searchResponse\u003dnull)"
},
"children": []
}
Original file line number Diff line number Diff line change
@@ -567,7 +567,7 @@ private void populateDefaultCursor(DefaultCursor cursor) {
Integer limit = cursor.getLimit();
long rowsLeft = rowsLeft(cursor.getFetchSize(), cursor.getLimit());
if (rowsLeft <= 0) {
// close the cursor
// Delete Point In Time ID
if (LocalClusterState.state().getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)) {
String pitId = cursor.getPitId();
PointInTimeHandler pit = new PointInTimeHandlerImpl(client, pitId);
Original file line number Diff line number Diff line change
@@ -5,11 +5,15 @@

package org.opensearch.sql.legacy.query.planner.logical.node;

import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import java.util.Map;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.query.join.TableInJoinRequestBuilder;
import org.opensearch.sql.legacy.query.planner.core.PlanNode;
import org.opensearch.sql.legacy.query.planner.logical.LogicalOperator;
import org.opensearch.sql.legacy.query.planner.physical.PhysicalOperator;
import org.opensearch.sql.legacy.query.planner.physical.node.pointInTime.PointInTime;
import org.opensearch.sql.legacy.query.planner.physical.node.scroll.Scroll;

/** Table scan */
@@ -33,6 +37,9 @@ public PlanNode[] children() {

@Override
public <T> PhysicalOperator[] toPhysical(Map<LogicalOperator, PhysicalOperator<T>> optimalOps) {
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
return new PhysicalOperator[] {new PointInTime(request, pageSize)};
}
return new PhysicalOperator[] {new Scroll(request, pageSize)};
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package org.opensearch.sql.legacy.query.planner.physical.node;

import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.common.Strings;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.search.SearchHit;
import org.opensearch.sql.legacy.domain.Where;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.query.join.TableInJoinRequestBuilder;
import org.opensearch.sql.legacy.query.maker.QueryMaker;
import org.opensearch.sql.legacy.query.planner.core.ExecuteParams;
import org.opensearch.sql.legacy.query.planner.core.PlanNode;
import org.opensearch.sql.legacy.query.planner.physical.Row;
import org.opensearch.sql.legacy.query.planner.physical.estimation.Cost;
import org.opensearch.sql.legacy.query.planner.resource.ResourceManager;

public abstract class Paginate extends BatchPhysicalOperator<SearchHit> {

/** Request to submit to OpenSearch to scan over */
protected final TableInJoinRequestBuilder request;

protected final int pageSize;

protected Client client;

protected SearchResponse searchResponse;

protected Integer timeout;

protected ResourceManager resourceMgr;

public Paginate(TableInJoinRequestBuilder request, int pageSize) {
this.request = request;
this.pageSize = pageSize;
}

@Override
public PlanNode[] children() {
return new PlanNode[0];
}

@Override
public Cost estimate() {
return new Cost();
}

@Override
public void open(ExecuteParams params) throws Exception {
super.open(params);
client = params.get(ExecuteParams.ExecuteParamType.CLIENT);
timeout = params.get(ExecuteParams.ExecuteParamType.TIMEOUT);
resourceMgr = params.get(ExecuteParams.ExecuteParamType.RESOURCE_MANAGER);

Object filter = params.get(ExecuteParams.ExecuteParamType.EXTRA_QUERY_FILTER);
if (filter instanceof BoolQueryBuilder) {
request
.getRequestBuilder()
.setQuery(generateNewQueryWithExtraFilter((BoolQueryBuilder) filter));

if (LOG.isDebugEnabled()) {
LOG.debug(
"Received extra query filter, re-build query: {}",
Strings.toString(
XContentType.JSON, request.getRequestBuilder().request().source(), true, true));
}
}
}

@Override
protected Collection<Row<SearchHit>> prefetch() {
Objects.requireNonNull(client, "Client connection is not ready");
Objects.requireNonNull(resourceMgr, "ResourceManager is not set");
Objects.requireNonNull(timeout, "Time out is not set");

if (searchResponse == null) {
loadFirstBatch();
updateMetaResult();
} else {
loadNextBatch();
}
return wrapRowForCurrentBatch();
}

protected abstract void loadFirstBatch();

protected abstract void loadNextBatch();

/**
* Extra filter pushed down from upstream. Re-parse WHERE clause with extra filter because
* OpenSearch RequestBuilder doesn't allow QueryBuilder inside be changed after added.
*/
protected QueryBuilder generateNewQueryWithExtraFilter(BoolQueryBuilder filter)
throws SqlParseException {
Where where = request.getOriginalSelect().getWhere();
BoolQueryBuilder newQuery;
if (where != null) {
newQuery = QueryMaker.explain(where, false);
newQuery.must(filter);
} else {
newQuery = filter;
}
return newQuery;
}

protected void updateMetaResult() {
resourceMgr.getMetaResult().addTotalNumOfShards(searchResponse.getTotalShards());
resourceMgr.getMetaResult().addSuccessfulShards(searchResponse.getSuccessfulShards());
resourceMgr.getMetaResult().addFailedShards(searchResponse.getFailedShards());
resourceMgr.getMetaResult().updateTimeOut(searchResponse.isTimedOut());
}

@SuppressWarnings("unchecked")
protected Collection<Row<SearchHit>> wrapRowForCurrentBatch() {
SearchHit[] hits = searchResponse.getHits().getHits();
Row[] rows = new Row[hits.length];
for (int i = 0; i < hits.length; i++) {
rows[i] = new SearchHitRow(hits[i], request.getAlias());
}
return Arrays.asList(rows);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [ " + describeTable() + ", pageSize=" + pageSize + " ]";
}

protected String describeTable() {
return request.getOriginalSelect().getFrom().get(0).getIndex() + " as " + request.getAlias();
}

/*********************************************
* Getters for Explain
*********************************************/

public String getRequest() {
return Strings.toString(XContentType.JSON, request.getRequestBuilder().request().source());
}
}
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.legacy.query.planner.physical.node.scroll;
package org.opensearch.sql.legacy.query.planner.physical.node;

import com.google.common.base.Strings;
import java.util.HashMap;
@@ -36,7 +36,7 @@
* ----------------------------------------------------------------------------------------------------------------------
* </pre>
*/
class SearchHitRow implements Row<SearchHit> {
public class SearchHitRow implements Row<SearchHit> {

/** Native OpenSearch data object for each row */
private final SearchHit hit;
@@ -47,7 +47,7 @@ class SearchHitRow implements Row<SearchHit> {
/** Table alias owned the row. Empty if this row comes from combination of two other rows */
private final String tableAlias;

SearchHitRow(SearchHit hit, String tableAlias) {
public SearchHitRow(SearchHit hit, String tableAlias) {
this.hit = hit;
this.source = hit.getSourceAsMap();
this.tableAlias = tableAlias;
Loading

0 comments on commit ce17d0a

Please sign in to comment.