Skip to content

Commit

Permalink
Handle all exceptions in data nodes can match (elastic#117469)
Browse files Browse the repository at this point in the history
During the can match phase, prior to the query phase, we may have exceptions
that are returned back to the coordinating node, handled gracefully as if the
shard returned canMatch=true.

During the query phase, we perform an additional rewrite and can match phase
to eventually shortcut the query phase for the shard. That needs to handle
exceptions as well. Currently, an exception there causes shard failures, while
we should rather go ahead and execute the query on the shard.

Instead of adding another try catch on consumers code, this commit adds exception handling to the method itself so that it can no longer throw exceptions and similar mistakes can no longer be made in the future.

At the same time, this commit makes the can match method more easily testable without requiring a full-blown SearchService instance.

Closes elastic#104994
  • Loading branch information
javanna committed Dec 12, 2024
1 parent f25b06d commit 372ce16
Show file tree
Hide file tree
Showing 9 changed files with 3,521 additions and 3,133 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/117469.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 117469
summary: Handle exceptions in query phase can match
area: Search
type: bug
issues:
- 104994
182 changes: 126 additions & 56 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand Down Expand Up @@ -559,16 +561,17 @@ public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task,
// check if we can shortcut the query phase entirely.
if (orig.canReturnNullResponseIfMatchNoDocs()) {
assert orig.scroll() == null;
final CanMatchShardResponse canMatchResp;
try {
ShardSearchRequest clone = new ShardSearchRequest(orig);
canMatchResp = canMatch(clone, false);
} catch (Exception exc) {
l.onFailure(exc);
return;
}
ShardSearchRequest clone = new ShardSearchRequest(orig);
CanMatchContext canMatchContext = new CanMatchContext(
clone,
indicesService::indexServiceSafe,
this::findReaderContext,
defaultKeepAlive,
maxKeepAlive
);
CanMatchShardResponse canMatchResp = canMatch(canMatchContext, false);
if (canMatchResp.canMatch() == false) {
l.onResponse(QuerySearchResult.nullInstance());
listener.onResponse(QuerySearchResult.nullInstance());
return;
}
}
Expand Down Expand Up @@ -1201,25 +1204,37 @@ public void freeAllScrollContexts() {
}

private long getKeepAlive(ShardSearchRequest request) {
return getKeepAlive(request, defaultKeepAlive, maxKeepAlive);
}

private static long getKeepAlive(ShardSearchRequest request, long defaultKeepAlive, long maxKeepAlive) {
if (request.scroll() != null) {
return getScrollKeepAlive(request.scroll());
return getScrollKeepAlive(request.scroll(), defaultKeepAlive, maxKeepAlive);
} else if (request.keepAlive() != null) {
checkKeepAliveLimit(request.keepAlive().millis());
checkKeepAliveLimit(request.keepAlive().millis(), maxKeepAlive);
return request.keepAlive().getMillis();
} else {
return request.readerId() == null ? defaultKeepAlive : -1;
}
}

private long getScrollKeepAlive(Scroll scroll) {
return getScrollKeepAlive(scroll, defaultKeepAlive, maxKeepAlive);
}

private static long getScrollKeepAlive(Scroll scroll, long defaultKeepAlive, long maxKeepAlive) {
if (scroll != null && scroll.keepAlive() != null) {
checkKeepAliveLimit(scroll.keepAlive().millis());
checkKeepAliveLimit(scroll.keepAlive().millis(), maxKeepAlive);
return scroll.keepAlive().getMillis();
}
return defaultKeepAlive;
}

private void checkKeepAliveLimit(long keepAlive) {
checkKeepAliveLimit(keepAlive, maxKeepAlive);
}

private static void checkKeepAliveLimit(long keepAlive, long maxKeepAlive) {
if (keepAlive > maxKeepAlive) {
throw new IllegalArgumentException(
"Keep alive for request ("
Expand Down Expand Up @@ -1678,6 +1693,7 @@ public void canMatch(CanMatchNodeRequest request, ActionListener<CanMatchNodeRes
final List<CanMatchNodeResponse.ResponseOrFailure> responses = new ArrayList<>(shardLevelRequests.size());
for (var shardLevelRequest : shardLevelRequests) {
try {
// TODO remove the exception handling as it's now in canMatch itself
responses.add(new CanMatchNodeResponse.ResponseOrFailure(canMatch(request.createShardSearchRequest(shardLevelRequest))));
} catch (Exception e) {
responses.add(new CanMatchNodeResponse.ResponseOrFailure(e));
Expand All @@ -1689,82 +1705,145 @@ public void canMatch(CanMatchNodeRequest request, ActionListener<CanMatchNodeRes
/**
* This method uses a lightweight searcher without wrapping (i.e., not open a full reader on frozen indices) to rewrite the query
* to check if the query can match any documents. This method can have false positives while if it returns {@code false} the query
* won't match any documents on the current shard.
* won't match any documents on the current shard. Exceptions are handled within the method, and never re-thrown.
*/
public CanMatchShardResponse canMatch(ShardSearchRequest request) throws IOException {
return canMatch(request, true);
public CanMatchShardResponse canMatch(ShardSearchRequest request) {
CanMatchContext canMatchContext = new CanMatchContext(
request,
indicesService::indexServiceSafe,
this::findReaderContext,
defaultKeepAlive,
maxKeepAlive
);
return canMatch(canMatchContext, true);
}

private CanMatchShardResponse canMatch(ShardSearchRequest request, boolean checkRefreshPending) throws IOException {
assert request.searchType() == SearchType.QUERY_THEN_FETCH : "unexpected search type: " + request.searchType();
static class CanMatchContext {
private final ShardSearchRequest request;
private final Function<Index, IndexService> indexServiceLookup;
private final BiFunction<ShardSearchContextId, TransportRequest, ReaderContext> findReaderContext;
private final long defaultKeepAlive;
private final long maxKeepAlive;

private IndexService indexService;

CanMatchContext(
ShardSearchRequest request,
Function<Index, IndexService> indexServiceLookup,
BiFunction<ShardSearchContextId, TransportRequest, ReaderContext> findReaderContext,
long defaultKeepAlive,
long maxKeepAlive
) {
this.request = request;
this.indexServiceLookup = indexServiceLookup;
this.findReaderContext = findReaderContext;
this.defaultKeepAlive = defaultKeepAlive;
this.maxKeepAlive = maxKeepAlive;
}

long getKeepAlive() {
return SearchService.getKeepAlive(request, defaultKeepAlive, maxKeepAlive);
}

ReaderContext findReaderContext() {
return findReaderContext.apply(request.readerId(), request);
}

QueryRewriteContext getQueryRewriteContext(IndexService indexService) {
return indexService.newQueryRewriteContext(request::nowInMillis, request.getRuntimeMappings(), request.getClusterAlias());
}

SearchExecutionContext getSearchExecutionContext(Engine.Searcher searcher) {
return getIndexService().newSearchExecutionContext(
request.shardId().id(),
0,
searcher,
request::nowInMillis,
request.getClusterAlias(),
request.getRuntimeMappings()
);
}

IndexShard getShard() {
return getIndexService().getShard(request.shardId().getId());
}

IndexService getIndexService() {
if (this.indexService == null) {
this.indexService = indexServiceLookup.apply(request.shardId().getIndex());
}
return this.indexService;
}
}

static CanMatchShardResponse canMatch(CanMatchContext canMatchContext, boolean checkRefreshPending) {
assert canMatchContext.request.searchType() == SearchType.QUERY_THEN_FETCH
: "unexpected search type: " + canMatchContext.request.searchType();
Releasable releasable = null;
try {
IndexService indexService;
final boolean hasRefreshPending;
final Engine.Searcher canMatchSearcher;
if (request.readerId() != null) {
if (canMatchContext.request.readerId() != null) {
hasRefreshPending = false;
ReaderContext readerContext;
Engine.Searcher searcher;
try {
readerContext = findReaderContext(request.readerId(), request);
releasable = readerContext.markAsUsed(getKeepAlive(request));
readerContext = canMatchContext.findReaderContext();
releasable = readerContext.markAsUsed(canMatchContext.getKeepAlive());
indexService = readerContext.indexService();
if (canMatchAfterRewrite(request, indexService) == false) {
QueryRewriteContext queryRewriteContext = canMatchContext.getQueryRewriteContext(indexService);
if (queryStillMatchesAfterRewrite(canMatchContext.request, queryRewriteContext) == false) {
return new CanMatchShardResponse(false, null);
}
searcher = readerContext.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE);
} catch (SearchContextMissingException e) {
final String searcherId = request.readerId().getSearcherId();
final String searcherId = canMatchContext.request.readerId().getSearcherId();
if (searcherId == null) {
throw e;
return new CanMatchShardResponse(true, null);
}
indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
if (canMatchAfterRewrite(request, indexService) == false) {
if (queryStillMatchesAfterRewrite(
canMatchContext.request,
canMatchContext.getQueryRewriteContext(canMatchContext.getIndexService())
) == false) {
return new CanMatchShardResponse(false, null);
}
IndexShard indexShard = indexService.getShard(request.shardId().getId());
final Engine.SearcherSupplier searcherSupplier = indexShard.acquireSearcherSupplier();
final Engine.SearcherSupplier searcherSupplier = canMatchContext.getShard().acquireSearcherSupplier();
if (searcherId.equals(searcherSupplier.getSearcherId()) == false) {
searcherSupplier.close();
throw e;
return new CanMatchShardResponse(true, null);
}
releasable = searcherSupplier;
searcher = searcherSupplier.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE);
}
canMatchSearcher = searcher;
} else {
indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
if (canMatchAfterRewrite(request, indexService) == false) {
if (queryStillMatchesAfterRewrite(
canMatchContext.request,
canMatchContext.getQueryRewriteContext(canMatchContext.getIndexService())
) == false) {
return new CanMatchShardResponse(false, null);
}
IndexShard indexShard = indexService.getShard(request.shardId().getId());
boolean needsWaitForRefresh = request.waitForCheckpoint() != UNASSIGNED_SEQ_NO;
boolean needsWaitForRefresh = canMatchContext.request.waitForCheckpoint() != UNASSIGNED_SEQ_NO;
// If this request wait_for_refresh behavior, it is safest to assume a refresh is pending. Theoretically,
// this can be improved in the future by manually checking that the requested checkpoint has already been refresh.
// However, this will request modifying the engine to surface that information.
IndexShard indexShard = canMatchContext.getShard();
hasRefreshPending = needsWaitForRefresh || (indexShard.hasRefreshPending() && checkRefreshPending);
canMatchSearcher = indexShard.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE);
}
try (canMatchSearcher) {
SearchExecutionContext context = indexService.newSearchExecutionContext(
request.shardId().id(),
0,
canMatchSearcher,
request::nowInMillis,
request.getClusterAlias(),
request.getRuntimeMappings()
);
final boolean canMatch = queryStillMatchesAfterRewrite(request, context);
final MinAndMax<?> minMax;
SearchExecutionContext context = canMatchContext.getSearchExecutionContext(canMatchSearcher);
final boolean canMatch = queryStillMatchesAfterRewrite(canMatchContext.request, context);
if (canMatch || hasRefreshPending) {
FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source());
minMax = sortBuilder != null ? FieldSortBuilder.getMinMaxOrNull(context, sortBuilder) : null;
} else {
minMax = null;
FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(canMatchContext.request.source());
final MinAndMax<?> minMax = sortBuilder != null ? FieldSortBuilder.getMinMaxOrNull(context, sortBuilder) : null;
return new CanMatchShardResponse(true, minMax);
}
return new CanMatchShardResponse(canMatch || hasRefreshPending, minMax);
return new CanMatchShardResponse(false, null);
}
} catch (Exception e) {
return new CanMatchShardResponse(true, null);
} finally {
Releasables.close(releasable);
}
Expand All @@ -1777,15 +1856,6 @@ private CanMatchShardResponse canMatch(ShardSearchRequest request, boolean check
* {@link MatchNoneQueryBuilder}. This allows us to avoid extra work for example making the shard search active and waiting for
* refreshes.
*/
private static boolean canMatchAfterRewrite(final ShardSearchRequest request, final IndexService indexService) throws IOException {
final QueryRewriteContext queryRewriteContext = indexService.newQueryRewriteContext(
request::nowInMillis,
request.getRuntimeMappings(),
request.getClusterAlias()
);
return queryStillMatchesAfterRewrite(request, queryRewriteContext);
}

@SuppressWarnings("unchecked")
public static boolean queryStillMatchesAfterRewrite(ShardSearchRequest request, QueryRewriteContext context) throws IOException {
Rewriteable.rewrite(request.getRewriteable(), context, false);
Expand Down
Loading

0 comments on commit 372ce16

Please sign in to comment.