Skip to content

Commit

Permalink
Merge branch 'elastic:main' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
stefnestor authored Dec 9, 2023
2 parents c0ab784 + 80b222c commit 95a1cfc
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.common.lucene.search.function.CombineFunction;
Expand Down Expand Up @@ -69,6 +68,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertScrollResponsesAndHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHit;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasId;
Expand Down Expand Up @@ -1403,22 +1403,15 @@ public void testParentChildQueriesViaScrollApi() throws Exception {
boolQuery().must(matchAllQuery()).filter(hasParentQuery("parent", matchAllQuery(), false)) };

for (QueryBuilder query : queries) {
SearchResponse scrollResponse = prepareSearch("test").setScroll(TimeValue.timeValueSeconds(30))
.setSize(1)
.addStoredField("_id")
.setQuery(query)
.get();

assertNoFailures(scrollResponse);
assertThat(scrollResponse.getHits().getTotalHits().value, equalTo(10L));
int scannedDocs = 0;
do {
assertThat(scrollResponse.getHits().getTotalHits().value, equalTo(10L));
scannedDocs += scrollResponse.getHits().getHits().length;
scrollResponse = client().prepareSearchScroll(scrollResponse.getScrollId()).setScroll(TimeValue.timeValueSeconds(30)).get();
} while (scrollResponse.getHits().getHits().length > 0);
clearScroll(scrollResponse.getScrollId());
assertThat(scannedDocs, equalTo(10));
assertScrollResponsesAndHitCount(
TimeValue.timeValueSeconds(60),
prepareSearch("test").setScroll(TimeValue.timeValueSeconds(30)).setSize(1).addStoredField("_id").setQuery(query),
10,
(respNum, response) -> {
assertNoFailures(response);
assertThat(response.getHits().getTotalHits().value, equalTo(10L));
}
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertScrollResponsesAndHitCount;

@ESIntegTestCase.SuiteScopeTestCase
public class AggregationsIntegrationIT extends ESIntegTestCase {
Expand All @@ -38,32 +39,22 @@ public void setupSuiteScopeCluster() throws Exception {

public void testScroll() {
final int size = randomIntBetween(1, 4);
final String[] scroll = new String[1];
final int[] total = new int[1];
assertNoFailuresAndResponse(
prepareSearch("index").setSize(size).setScroll(TimeValue.timeValueMinutes(1)).addAggregation(terms("f").field("f")),
response -> {
Aggregations aggregations = response.getAggregations();
assertNotNull(aggregations);
Terms terms = aggregations.get("f");
assertEquals(Math.min(numDocs, 3L), terms.getBucketByKey("0").getDocCount());
scroll[0] = response.getScrollId();
total[0] = response.getHits().getHits().length;
assertScrollResponsesAndHitCount(
TimeValue.timeValueSeconds(60),
prepareSearch("index").setSize(size).addAggregation(terms("f").field("f")),
numDocs,
(respNum, response) -> {
assertNoFailures(response);

if (respNum == 1) { // initial response.
Aggregations aggregations = response.getAggregations();
assertNotNull(aggregations);
Terms terms = aggregations.get("f");
assertEquals(Math.min(numDocs, 3L), terms.getBucketByKey("0").getDocCount());
} else {
assertNull(response.getAggregations());
}
}
);
int currentTotal = 0;
while (total[0] - currentTotal > 0) {
currentTotal = total[0];
assertNoFailuresAndResponse(
client().prepareSearchScroll(scroll[0]).setScroll(TimeValue.timeValueMinutes(1)),
scrollResponse -> {
assertNull(scrollResponse.getAggregations());
total[0] += scrollResponse.getHits().getHits().length;
scroll[0] = scrollResponse.getScrollId();
}
);
}
clearScroll(scroll[0]);
assertEquals(numDocs, total[0]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1881,7 +1881,7 @@ protected void addError(Exception e) {
/**
* Clears the given scroll Ids
*/
public void clearScroll(String... scrollIds) {
public static void clearScroll(String... scrollIds) {
ClearScrollResponse clearResponse = client().prepareClearScroll().setScrollIds(Arrays.asList(scrollIds)).get();
assertThat(clearResponse.isSucceeded(), equalTo(true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BaseBroadcastResponse;
Expand All @@ -51,6 +52,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
Expand All @@ -60,10 +62,13 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import static org.apache.lucene.tests.util.LuceneTestCase.expectThrows;
import static org.apache.lucene.tests.util.LuceneTestCase.expectThrowsAnyOf;
import static org.elasticsearch.test.ESIntegTestCase.clearScroll;
import static org.elasticsearch.test.ESIntegTestCase.client;
import static org.elasticsearch.test.LambdaMatchers.transformedArrayItemsMatch;
import static org.elasticsearch.test.LambdaMatchers.transformedItemsMatch;
import static org.elasticsearch.test.LambdaMatchers.transformedMatch;
Expand All @@ -73,6 +78,7 @@
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -369,6 +375,48 @@ public static <Q extends ActionRequest, R extends ActionResponse> void assertRes
}
}

/**
* A helper to enable the testing of scroll requests with ref-counting.
*
* @param keepAlive The TTL for the scroll context.
* @param searchRequestBuilder The initial search request.
* @param expectedTotalHitCount The number of hits that are expected to be retrieved.
* @param responseConsumer (respNum, response) -> {your assertions here}.
* respNum starts at 1, which contains the resp from the initial request.
*/
public static void assertScrollResponsesAndHitCount(
TimeValue keepAlive,
SearchRequestBuilder searchRequestBuilder,
int expectedTotalHitCount,
BiConsumer<Integer, SearchResponse> responseConsumer
) {
searchRequestBuilder.setScroll(keepAlive);
List<SearchResponse> responses = new ArrayList<>();
var scrollResponse = searchRequestBuilder.get();
responses.add(scrollResponse);
int retrievedDocsCount = 0;
try {
assertThat(scrollResponse.getHits().getTotalHits().value, equalTo((long) expectedTotalHitCount));
retrievedDocsCount += scrollResponse.getHits().getHits().length;
responseConsumer.accept(responses.size(), scrollResponse);
while (scrollResponse.getHits().getHits().length > 0) {
scrollResponse = prepareScrollSearch(scrollResponse.getScrollId(), keepAlive).get();
responses.add(scrollResponse);
assertThat(scrollResponse.getHits().getTotalHits().value, equalTo((long) expectedTotalHitCount));
retrievedDocsCount += scrollResponse.getHits().getHits().length;
responseConsumer.accept(responses.size(), scrollResponse);
}
} finally {
clearScroll(scrollResponse.getScrollId());
responses.forEach(SearchResponse::decRef);
}
assertThat(retrievedDocsCount, equalTo(expectedTotalHitCount));
}

public static SearchScrollRequestBuilder prepareScrollSearch(String scrollId, TimeValue timeout) {
return client().prepareSearchScroll(scrollId).setScroll(timeout);
}

public static <R extends ActionResponse> void assertResponse(ActionFuture<R> responseFuture, Consumer<R> consumer)
throws ExecutionException, InterruptedException {
var res = responseFuture.get();
Expand Down Expand Up @@ -442,6 +490,10 @@ public static void assertFailures(SearchRequestBuilder searchRequestBuilder, Res
}
}

public static void assertFailures(SearchRequestBuilder searchRequestBuilder, RestStatus restStatus) {
assertFailures(searchRequestBuilder, restStatus, containsString(""));
}

public static void assertNoFailures(BaseBroadcastResponse response) {
if (response.getFailedShards() != 0) {
final AssertionError assertionError = new AssertionError("[" + response.getFailedShards() + "] shard failures");
Expand Down Expand Up @@ -791,9 +843,9 @@ public static void assertToXContentEquivalent(BytesReference expected, BytesRefe
* Often latches are called as <code>assertTrue(latch.await(1, TimeUnit.SECONDS));</code>
* In case of a failure this will just throw an assertion error without any further message
*
* @param latch The latch to wait for
* @param timeout The value of the timeout
* @param unit The unit of the timeout
* @param latch The latch to wait for
* @param timeout The value of the timeout
* @param unit The unit of the timeout
* @throws InterruptedException An exception if the waiting is interrupted
*/
public static void awaitLatch(CountDownLatch latch, long timeout, TimeUnit unit) throws InterruptedException {
Expand Down

0 comments on commit 95a1cfc

Please sign in to comment.