Skip to content

Commit

Permalink
Sort and search_after fixx
Browse files Browse the repository at this point in the history
  • Loading branch information
sohami committed Aug 22, 2023
1 parent fa7d5ca commit 690f0f9
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.Version;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.action.search.SearchType;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.lease.Releasables;
Expand Down Expand Up @@ -931,14 +932,6 @@ public BucketCollectorProcessor bucketCollectorProcessor() {
* false: otherwise
*/
private boolean useConcurrentSearch(Executor concurrentSearchExecutor) {
// Disable concurrent segment search if time-series based sort optimization can be applied on the index. This is done to avoid
// performance regression in such cases as segment order matters and most of the segments are skipped and not even evaluated for
// search. When concurrent segment search is used then order of the segments will be randomized and segments gets distributed across
// slices and unnecessary work will be done
if (indexShard.isTimeSeriesDescSortOptimizationEnabled()) {
return false;
}

if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)
&& (clusterService != null)
&& (concurrentSearchExecutor != null)) {
Expand All @@ -952,4 +945,17 @@ private boolean useConcurrentSearch(Executor concurrentSearchExecutor) {
return false;
}
}

@Override
public boolean isSortOnTimeSeriesField() {
if (sort != null
&& sort.sort != null
&& sort.sort.getSort() != null
&& sort.sort.getSort().length > 0
&& sort.sort.getSort()[0].getField() != null
&& sort.sort.getSort()[0].getField().equals(DataStream.TIMESERIES_FIELDNAME)) {
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -523,13 +523,7 @@ private boolean shouldReverseLeafReaderContexts() {
// reader order here.
if (searchContext.indexShard().isTimeSeriesDescSortOptimizationEnabled()) {
// Only reverse order for asc order sort queries
if (searchContext.sort() != null
&& searchContext.sort().sort != null
&& searchContext.sort().sort.getSort() != null
&& searchContext.sort().sort.getSort().length > 0
&& searchContext.sort().sort.getSort()[0].getReverse() == false
&& searchContext.sort().sort.getSort()[0].getField() != null
&& searchContext.sort().sort.getSort()[0].getField().equals(DataStream.TIMESERIES_FIELDNAME)) {
if (searchContext.isSortOnTimeSeriesField() && searchContext.sort().sort.getSort()[0].getReverse() == false) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,4 +564,9 @@ public BucketCollectorProcessor bucketCollectorProcessor() {
public boolean isConcurrentSegmentSearchEnabled() {
return in.isConcurrentSegmentSearchEnabled();
}

@Override
public boolean isSortOnTimeSeriesField() {
return in.isSortOnTimeSeriesField();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -485,4 +485,6 @@ public String toString() {
public abstract void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor);

public abstract BucketCollectorProcessor bucketCollectorProcessor();

public abstract boolean isSortOnTimeSeriesField();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
package org.opensearch.search.internal;

import org.apache.lucene.search.Query;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.query.ParsedQuery;
import org.opensearch.search.aggregations.SearchContextAggregations;
Expand Down Expand Up @@ -364,4 +365,17 @@ public FetchSearchResult fetchResult() {
public long getRelativeTimeInMillis() {
throw new UnsupportedOperationException("Not supported");
}

@Override
public boolean isSortOnTimeSeriesField() {
if (sort != null
&& sort.sort != null
&& sort.sort.getSort() != null
&& sort.sort.getSort().length > 0
&& sort.sort.getSort()[0].getField() != null
&& sort.sort.getSort()[0].getField().equals(DataStream.TIMESERIES_FIELDNAME)) {
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ public boolean searchWith(
boolean hasFilterCollector,
boolean hasTimeout
) throws IOException {
if (searchContext.isConcurrentSegmentSearchEnabled()) {
if (useConcurrentPath(searchContext)) {
LOGGER.info("Using concurrent search over segments (experimental)");
return concurrentQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
} else {
LOGGER.info("Using non-concurrent search");
return defaultQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
}
}
Expand All @@ -72,11 +73,22 @@ public boolean searchWith(
*/
@Override
public AggregationProcessor aggregationProcessor(SearchContext searchContext) {
if (searchContext.isConcurrentSegmentSearchEnabled()) {
if (useConcurrentPath(searchContext)) {
LOGGER.info("Using concurrent search over segments (experimental)");
return concurrentQueryPhaseSearcher.aggregationProcessor(searchContext);
} else {
LOGGER.info("Using non-concurrent search");
return defaultQueryPhaseSearcher.aggregationProcessor(searchContext);
}
}
private boolean useConcurrentPath(SearchContext context) {
// Disable concurrent segment search if time-series based sort optimization can be applied on the index. This is done to avoid
// performance regression in such cases as segment order matters and most of the segments are skipped and not even evaluated for
// search. When concurrent segment search is used then order of the segments will be randomized and segments gets distributed across
// slices and unnecessary work will be done
if (context.indexShard().isTimeSeriesDescSortOptimizationEnabled() && context.isSortOnTimeSeriesField()) {
return false;
}
return context.isConcurrentSegmentSearchEnabled();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.action.OriginalIndices;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.action.search.SearchType;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.core.index.shard.ShardId;
Expand Down Expand Up @@ -680,6 +681,19 @@ public BucketCollectorProcessor bucketCollectorProcessor() {
return bucketCollectorProcessor;
}

@Override
public boolean isSortOnTimeSeriesField() {
if (sort != null
&& sort.sort != null
&& sort.sort.getSort() != null
&& sort.sort.getSort().length > 0
&& sort.sort.getSort()[0].getField() != null
&& sort.sort.getSort()[0].getField().equals(DataStream.TIMESERIES_FIELDNAME)) {
return true;
}
return false;
}

/**
* Clean the query results by consuming all of it
*/
Expand Down

0 comments on commit 690f0f9

Please sign in to comment.