Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into knn-as-query
Browse files Browse the repository at this point in the history
  • Loading branch information
mayya-sharipova committed Oct 31, 2023
2 parents e0ae1dc + bfffbd4 commit 348305f
Show file tree
Hide file tree
Showing 21 changed files with 157 additions and 61 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/100368.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 100368
summary: "Status codes for Aggregation errors, part 2"
area: Aggregations
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchException;
import org.elasticsearch.search.TooManyScrollContextsException;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.UnsupportedAggregationOnDownsampledIndex;
import org.elasticsearch.transport.TcpTransport;
Expand Down Expand Up @@ -1861,6 +1862,12 @@ private enum ElasticsearchExceptionHandle {
TooManyScrollContextsException::new,
173,
TransportVersions.TOO_MANY_SCROLL_CONTEXTS_EXCEPTION_ADDED
),
INVALID_BUCKET_PATH_EXCEPTION(
AggregationExecutionException.InvalidPath.class,
AggregationExecutionException.InvalidPath::new,
174,
TransportVersions.INVALID_BUCKET_PATH_EXCEPTION_INTRODUCED
);

final Class<? extends ElasticsearchException> exceptionClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
import java.util.TreeMap;
import java.util.TreeSet;

/**
* <p>Transport version is used to coordinate compatible wire protocol communication between nodes, at a fine-grained level. This replaces
* and supersedes the old Version constants.</p>
*
* <p>Before adding a new version constant, please read the block comment at the end of the list of constants.</p>
*/
public class TransportVersions {

/*
Expand Down Expand Up @@ -150,7 +156,8 @@ static TransportVersion def(int id) {
public static final TransportVersion PRIMARY_TERM_ADDED = def(8_525_00_0);
public static final TransportVersion CLUSTER_FEATURES_ADDED = def(8_526_00_0);
public static final TransportVersion DSL_ERROR_STORE_INFORMATION_ENHANCED = def(8_527_00_0);
public static final TransportVersion KNN_AS_QUERY_ADDED = def(8_528_00_0);
public static final TransportVersion INVALID_BUCKET_PATH_EXCEPTION_INTRODUCED = def(8_528_00_0);
public static final TransportVersion KNN_AS_QUERY_ADDED = def(8_529_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* Collection of helper methods for what to throw in common aggregation error scenarios.
*/
public class AggregationErrors {
private static String parentType;

private AggregationErrors() {}

Expand Down Expand Up @@ -68,7 +67,7 @@ public static RuntimeException rateWithoutDateHistogram(String name) {
* @param position - optional, for multisource aggregations. Indicates the position of the field causing the problem.
* @return - an appropriate exception
*/
public static RuntimeException reduceTypeMissmatch(String aggregationName, Optional<Integer> position) {
public static RuntimeException reduceTypeMismatch(String aggregationName, Optional<Integer> position) {
String fieldString;
if (position.isPresent()) {
fieldString = "the field in position" + position.get().toString();
Expand Down Expand Up @@ -104,6 +103,15 @@ public static RuntimeException unsupportedMultivalue() {
);
}

/**
* Indicates the given values source is not suitable for use in a multivalued aggregation. This is not retryable.
* @param source a string describing the Values Source
* @return an appropriate exception
*/
public static RuntimeException unsupportedMultivalueValuesSource(String source) {
throw new IllegalArgumentException("ValuesSource type " + source + "is not supported for multi-valued aggregation");
}

/**
* Indicates an attempt to use date rounding on a non-date values source
* @param typeName - name of the type we're attempting to round
Expand All @@ -112,4 +120,41 @@ public static RuntimeException unsupportedMultivalue() {
public static RuntimeException unsupportedRounding(String typeName) {
return new IllegalArgumentException("can't round a [" + typeName + "]");
}

/**
* Indicates that an aggregation path (e.g. from a pipeline agg) references an aggregation of the wrong type, for example
* attempting to take a cumulative cardinality of something other than a cardinality aggregation.
*
* @param aggPath the path element found to be invalid
* @param expected
* @param got What we actually got; this may be null.
* @param currentAgg The name of the aggregation in question
* @return an appropriate exception
*/
public static RuntimeException incompatibleAggregationType(String aggPath, String expected, String got, String currentAgg) {

return new AggregationExecutionException.InvalidPath(
aggPath
+ " must reference a "
+ expected
+ " aggregation, got: ["
+ (got == null ? "null" : got)
+ "] at aggregation ["
+ currentAgg
+ "]"
);
}

/**
* This is a 500 class error indicating a programming error. Hopefully we never see this outside of tests.
* @param bucketOrds - the ords we are processing
* @param got - the ordinal we got
* @param expected - the ordinal we expected
* @return an appropriate exception
*/
public static RuntimeException iterationOrderChangedWithoutMutating(String bucketOrds, long got, long expected) {
return new AggregationExecutionException(
"Iteration order of [" + bucketOrds + "] changed without mutating. [" + got + "] should have been [" + expected + "]"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;

Expand All @@ -28,4 +29,24 @@ public AggregationExecutionException(String msg, Throwable cause) {
public AggregationExecutionException(StreamInput in) throws IOException {
super(in);
}

public static class InvalidPath extends AggregationExecutionException {

public InvalidPath(String msg) {
super(msg);
}

public InvalidPath(String msg, Throwable cause) {
super(msg, cause);
}

public InvalidPath(StreamInput in) throws IOException {
super(in);
}

@Override
public RestStatus status() {
return RestStatus.BAD_REQUEST;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ private static void executeInSortOrder(SearchContext context, BucketCollector co
try {
searcher.search(context.rewrittenQuery(), collector);
} catch (IOException e) {
// Seems like this should be 400 (non-retryable), but we clearly intentionally throw a 500 here. Why?
throw new AggregationExecutionException("Could not perform time series aggregation", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public <T extends Bucket> Comparator<T> partiallyBuiltBucketComparator(ToLongFun
BucketComparator bucketComparator = path.bucketComparator(aggregator, order);
return (lhs, rhs) -> bucketComparator.compare(ordinalReader.applyAsLong(lhs), ordinalReader.applyAsLong(rhs));
} catch (IllegalArgumentException e) {
throw new AggregationExecutionException("Invalid aggregation order path [" + path + "]. " + e.getMessage(), e);
throw new AggregationExecutionException.InvalidPath("Invalid aggregation order path [" + path + "]. " + e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.AggregationErrors;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorBase;
import org.elasticsearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -340,7 +340,9 @@ protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(
totalOrdsToCollect += bucketCount;
}
if (totalOrdsToCollect > Integer.MAX_VALUE) {
throw new AggregationExecutionException(
// TODO: We should instrument this error. While it is correct for it to be a 400 class IllegalArgumentException, there is not
// much the user can do about that. If this occurs with any frequency, we should do something about it.
throw new IllegalArgumentException(
"Can't collect more than [" + Integer.MAX_VALUE + "] buckets but attempted [" + totalOrdsToCollect + "]"
);
}
Expand All @@ -361,14 +363,11 @@ protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
while (ordsEnum.next()) {
if (bucketOrdsToCollect[b] != ordsEnum.ord()) {
throw new AggregationExecutionException(
"Iteration order of ["
+ bucketOrds
+ "] changed without mutating. ["
+ ordsEnum.ord()
+ "] should have been ["
+ bucketOrdsToCollect[b]
+ "]"
// If we hit this, something has gone horribly wrong and we need to investigate
throw AggregationErrors.iterationOrderChangedWithoutMutating(
bucketOrds.toString(),
ordsEnum.ord(),
bucketOrdsToCollect[b]
);
}
buckets.add(bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults[b++]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardin
);
}
if (cardinality != CardinalityUpperBound.ONE) {
// Hitting this exception is a programmer error. Hopefully never seen in production.
throw new AggregationExecutionException("Aggregation [" + name() + "] must have cardinality 1 but was [" + cardinality + "]");
}
return new GlobalAggregator(name, factories, context, metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.search.aggregations.AggregationErrors;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketOrder;
Expand Down Expand Up @@ -195,15 +195,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
while (ordsEnum.next()) {
long ordinal = ordsEnum.ord();
if (bucketOrdsToCollect[b] != ordinal) {
throw new AggregationExecutionException(
"Iteration order of ["
+ bucketOrds
+ "] changed without mutating. ["
+ ordinal
+ "] should have been ["
+ bucketOrdsToCollect[b]
+ "]"
);
throw AggregationErrors.iterationOrderChangedWithoutMutating(bucketOrds.toString(), ordinal, bucketOrdsToCollect[b]);
}
BytesRef ipAddress = new BytesRef();
ordsEnum.readValue(ipAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Aggreg
if (referenceTerms != null && referenceTerms.getClass().equals(terms.getClass()) == false && terms.canLeadReduction()) {
// control gets into this loop when the same field name against which the query is executed
// is of different types in different indices.
throw AggregationErrors.reduceTypeMissmatch(referenceTerms.getName(), Optional.empty());
throw AggregationErrors.reduceTypeMismatch(referenceTerms.getName(), Optional.empty());
}
otherDocCount[0] += terms.getSumOfOtherDocCounts();
final long thisAggDocCountError = getDocCountError(terms);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public GlobalOrdinalsStringTermsAggregator(
} else {
this.collectionStrategy = cardinality.map(estimate -> {
if (estimate > 1) {
// This is a 500 class error, because we should never be able to reach it.
throw new AggregationExecutionException("Dense ords don't know how to collect from many buckets");
}
return new DenseGlobalOrds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Aggreg
&& terms.getClass().equals(UnmappedRareTerms.class) == false) {
// control gets into this loop when the same field name against which the query is executed
// is of different types in different indices.
throw AggregationErrors.reduceTypeMissmatch(referenceTerms.getName(), Optional.empty());
throw AggregationErrors.reduceTypeMismatch(referenceTerms.getName(), Optional.empty());
}
for (B bucket : terms.getBuckets()) {
List<B> bucketList = buckets.computeIfAbsent(bucket.getKey(), k -> new ArrayList<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public final InternalAggregation doReduce(Aggregations aggregations, Aggregation
Aggregation currentAgg = aggregation;
while (currElement < parsedPath.size() - 1) {
if (currentAgg == null) {
throw new IllegalArgumentException(
throw new AggregationExecutionException.InvalidPath(
"bucket_path ["
+ bucketsPaths()[0]
+ "] expected aggregation with name ["
Expand All @@ -73,7 +73,8 @@ public final InternalAggregation doReduce(Aggregations aggregations, Aggregation
parsedPath.get(currElement).key()
);
if (bucket == null) {
throw new AggregationExecutionException(
// seems not retryable, and therefore should be 400?
throw new AggregationExecutionException.InvalidPath(
"missing bucket ["
+ parsedPath.get(currElement).key()
+ "] for agg ["
Expand All @@ -84,13 +85,15 @@ public final InternalAggregation doReduce(Aggregations aggregations, Aggregation
);
}
if (currElement == parsedPath.size() - 1) {
// Seems not retryable, should be 400
throw new AggregationExecutionException(
"invalid bucket path ends at [" + parsedPath.get(currElement).key() + "]"
);
}
currentAgg = bucket.getAggregations().get(parsedPath.get(++currElement).name());
} else {
throw new AggregationExecutionException(
// Seems not retryable, should be 400
throw new AggregationExecutionException.InvalidPath(
"bucket_path ["
+ bucketsPaths()[0]
+ "] indicates bucket_key ["
Expand All @@ -107,10 +110,11 @@ public final InternalAggregation doReduce(Aggregations aggregations, Aggregation
}
}
if (currentAgg instanceof InternalMultiBucketAggregation == false) {
// Seems not retryable, should be 400
String msg = currentAgg == null
? "did not find multi-bucket aggregation for extraction."
: "did not find multi-bucket aggregation for extraction. Found [" + currentAgg.getName() + "]";
throw new AggregationExecutionException(msg);
throw new AggregationExecutionException.InvalidPath(msg);
}
List<String> sublistedPath = AggregationPath.pathElementsAsStringList(parsedPath.subList(currElement, parsedPath.size()));
// First element is the current agg, so we want the rest of the path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ public <T> T getAggregator(RegistryKey<T> registryKey, ValuesSourceConfig values
}
return supplier;
}
// This should be a startup error. Should never happen, probably indicates a bad plugin if it does. Should probably log and have
// actual docs on how to resolve.
throw new AggregationExecutionException(
"Unregistered Aggregation [" + (registryKey != null ? registryKey.getName() : "unknown aggregation") + "]"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.elasticsearch.search.SearchException;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.TooManyScrollContextsException;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.UnsupportedAggregationOnDownsampledIndex;
import org.elasticsearch.search.internal.ShardSearchContextId;
Expand Down Expand Up @@ -836,6 +837,7 @@ public void testIds() {
ids.put(171, ApiNotAvailableException.class);
ids.put(172, RecoveryCommitTooNewException.class);
ids.put(173, TooManyScrollContextsException.class);
ids.put(174, AggregationExecutionException.InvalidPath.class);

Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1730,8 +1730,8 @@ public void testOrderByPipelineAggregation() throws Exception {

MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("field");

AggregationExecutionException e = expectThrows(
AggregationExecutionException.class,
AggregationExecutionException.InvalidPath e = expectThrows(
AggregationExecutionException.InvalidPath.class,
() -> searchAndReduce(indexReader, new AggTestConfig(termsAgg, fieldType))
);
assertEquals(
Expand Down
Loading

0 comments on commit 348305f

Please sign in to comment.