Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply the date histogram rewrite optimization to range aggregation #13865

Merged
merged 42 commits into from
Jun 19, 2024
Merged
Changes from 1 commit
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
53cb70f
Refactor the ranges representation
bowenlan-amzn May 21, 2024
69730b1
Refactor try fast filter
bowenlan-amzn May 22, 2024
1e2d7f4
Main work finished; left the handling of different numeric data types
bowenlan-amzn May 23, 2024
95b04dd
buildRanges accepts field type
bowenlan-amzn May 28, 2024
8dd1dda
first working draft probably
bowenlan-amzn May 29, 2024
c5d2175
Merge branch 'main' into 13531-range-agg
bowenlan-amzn May 29, 2024
ed79e02
add change log
bowenlan-amzn May 29, 2024
c7043e4
accommodate geo distance agg
bowenlan-amzn May 29, 2024
90d6790
Fix test
bowenlan-amzn May 29, 2024
67c281c
Merge branch 'main' into 13531-range-agg
bowenlan-amzn May 29, 2024
c10c775
[Refactor] range is lower inclusive, right exclusive
bowenlan-amzn May 31, 2024
783b14a
Merge branch 'main' into 13531-range-agg
bowenlan-amzn Jun 2, 2024
06b3372
adding test
bowenlan-amzn Jun 5, 2024
c6b5a9c
Adding test and refactor
bowenlan-amzn Jun 5, 2024
d590081
Merge branch 'main' into 13531-range-agg
bowenlan-amzn Jun 5, 2024
58e5281
refactor
bowenlan-amzn Jun 5, 2024
37c6d84
add test
bowenlan-amzn Jun 6, 2024
e0ba84b
add test and update the compare logic in tree traversal
bowenlan-amzn Jun 6, 2024
4603ec0
fix test, add random test
bowenlan-amzn Jun 6, 2024
afbce0c
Merge branch 'main' into 13531-range-agg
bowenlan-amzn Jun 6, 2024
9359fc2
Merge branch 'main' into 13531-range-agg
bowenlan-amzn Jun 6, 2024
54bfe92
refactor to address comments
bowenlan-amzn Jun 6, 2024
6ae1a9b
Merge branch 'main' into 13531-range-agg
bowenlan-amzn Jun 7, 2024
cc92c44
Merge branch 'main' into 13531-range-agg
bowenlan-amzn Jun 7, 2024
6546736
small potential performance update
bowenlan-amzn Jun 8, 2024
328006b
fix precommit
bowenlan-amzn Jun 9, 2024
a290e1d
Merge branch 'main' into 13531-range-agg
bowenlan-amzn Jun 9, 2024
23bbcbb
Merge branch 'main' into 13531-range-agg
bowenlan-amzn Jun 10, 2024
1b586bb
Merge branch 'main' into 13531-range-agg
bowenlan-amzn Jun 10, 2024
65de090
refactor
bowenlan-amzn Jun 11, 2024
f3c07c7
refactor
bowenlan-amzn Jun 11, 2024
fc0aff5
Merge branch 'main' into 13531-range-agg
bowenlan-amzn Jun 11, 2024
bab28e6
set refresh_interval to -1
bowenlan-amzn Jun 11, 2024
e545b90
Merge branch 'main' into 13531-range-agg
bowenlan-amzn Jun 11, 2024
78b4d9d
address comment
bowenlan-amzn Jun 11, 2024
185ed4e
address comment
bowenlan-amzn Jun 12, 2024
910b66a
Merge branch 'main' into 13531-range-agg
bowenlan-amzn Jun 12, 2024
a2d50ce
address comment
bowenlan-amzn Jun 13, 2024
fe85ad3
Fix test
bowenlan-amzn Jun 13, 2024
48a03a4
Merge branch 'main' into 13531-range-agg
bowenlan-amzn Jun 18, 2024
07a5293
Merge branch 'main' into 13531-range-agg
bowenlan-amzn Jun 18, 2024
9764b23
Merge branch 'main' into 13531-range-agg
bowenlan-amzn Jun 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Adding test and refactor
Signed-off-by: bowenlan-amzn <[email protected]>
bowenlan-amzn committed Jun 5, 2024
commit c6b5a9c92840db8400a38c3f551a51460c393905
Original file line number Diff line number Diff line change
@@ -188,6 +188,7 @@ public ScaledFloatFieldType(String name, double scalingFactor) {
this(name, true, false, true, Collections.emptyMap(), scalingFactor, null);
}

@Override
public double getScalingFactor() {
return scalingFactor;
}
Original file line number Diff line number Diff line change
@@ -536,4 +536,8 @@ public Map<String, String> meta() {
public TextSearchInfo getTextSearchInfo() {
return textSearchInfo;
}

public double getScalingFactor() {
throw new IllegalCallerException("Field [" + name() + "] of type [" + typeName() + "] does not support scaling factor");
}
}
Original file line number Diff line number Diff line change
@@ -45,10 +45,10 @@
import org.opensearch.search.aggregations.bucket.histogram.LongBounds;
import org.opensearch.search.aggregations.bucket.range.RangeAggregator.Range;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.HashMap;
@@ -456,20 +456,32 @@ public DebugInfo tryFastFilterAggregation(
/**
* For range aggregation
*/
public static class RangeAggregationType implements FastFilterRewriteHelper.AggregationType {
public static class RangeAggregationType implements AggregationType {

private final ValuesSource.Numeric source;
private final ValuesSourceConfig config;
private final Range[] ranges;
private FieldTypeEnum fieldTypeEnum;

public RangeAggregationType(ValuesSource.Numeric source, Range[] ranges) {
this.source = source;
public RangeAggregationType(ValuesSourceConfig config, Range[] ranges) {
this.source = (ValuesSource.Numeric) config.getValuesSource();
this.config = config;
this.ranges = ranges;
}

@Override
public boolean isRewriteable(Object parent, int subAggLength) {
if (parent == null && subAggLength == 0) {
// don't accept values source with scripts
if (parent == null && subAggLength == 0 && config.script() == null && config.missing() == null) {
try {
fieldTypeEnum = FieldTypeEnum.fromTypeName(config.fieldType().typeName());
if (fieldTypeEnum.isScaled()) {
// make sure we can safely get scaling factor later on
config.fieldType().getScalingFactor();
}
} catch (Exception e) {
return false;
}

if (source instanceof ValuesSource.Numeric.FieldData) {
// ranges are already sorted by from and then to
// we want ranges not overlapping with each other
@@ -488,57 +500,16 @@ public boolean isRewriteable(Object parent, int subAggLength) {

@Override
public Ranges buildRanges(SearchContext ctx, MappedFieldType fieldType) throws IOException {
int byteLen = -1;
boolean scaled = false;
String pointType = "";
switch (fieldType.typeName()) {
case "half_float":
byteLen = 2;
pointType = "half_float";
break;
case "integer":
case "short":
case "byte":
byteLen = 4;
pointType = "int";
break;
case "float":
byteLen = 4;
pointType = "float";
break;
case "long":
case "date":
case "date_nanos":
byteLen = 8;
pointType = "long";
break;
case "double":
byteLen = 8;
pointType = "double";
break;
case "unsigned_long":
byteLen = 16;
pointType = "big_integer";
break;
case "scaled_float":
byteLen = 8;
scaled = true;
pointType = "long";
break;
}
if (byteLen == -1) {
throw new IllegalArgumentException("Field type " + fieldType.name() + " is not supported");
}
int byteLen = this.fieldTypeEnum.getByteLen();
String pointType = this.fieldTypeEnum.getPointType();

byte[][] mins = new byte[ranges.length][];
byte[][] maxs = new byte[ranges.length][];
for (int i = 0; i < ranges.length; i++) {
double rangeMin = ranges[i].getFrom();
double rangeMax = ranges[i].getTo();

byte[] min = new byte[byteLen];
byte[] max = new byte[byteLen];
// TODO any heavy operation that shouldn't be in the loop?
switch (pointType) {
case "half_float":
HalfFloatPoint.encodeDimension((float) rangeMin, min, 0);
@@ -557,18 +528,8 @@ public Ranges buildRanges(SearchContext ctx, MappedFieldType fieldType) throws I
IntPoint.encodeDimension((int) rangeMax, max, 0);
break;
case "long":
if (scaled) {
// use reflection to see if fieldType has a method called getScalingFactor
// if it does, use that to scale the rangeMin and rangeMax
// if it doesn't, use the default scaling factor of 1000
double scalingFactor = 100;
try {
// Method scalingFactorMethod = fieldType.getClass().getMethod("getScalingFactor");
scalingFactor = (double) fieldType.getClass().getMethod("getScalingFactor").invoke(fieldType);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
logger.debug("Failed to get scaling factor from fieldType", e);
// TODO fall back to default aggregation method
}
if (this.fieldTypeEnum.isScaled()) {
double scalingFactor = fieldType.getScalingFactor();
rangeMin = scalingFactor * rangeMin;
rangeMax = scalingFactor * rangeMax;
}
@@ -588,6 +549,61 @@ public Ranges buildRanges(SearchContext ctx, MappedFieldType fieldType) throws I
return new Ranges(mins, maxs, byteLen);
}

private enum FieldTypeEnum {
HALF_FLOAT("half_float", 2, "half_float"),
INTEGER("integer", 4, "int"),
SHORT("short", 4, "int"),
BYTE("byte", 4, "int"),
FLOAT("float", 4, "float"),
LONG("long", 8, "long"),
DATE("date", 8, "long"),
DATE_NANOS("date_nanos", 8, "long"),
DOUBLE("double", 8, "double"),
UNSIGNED_LONG("unsigned_long", 16, "big_integer"),
SCALED_FLOAT("scaled_float", 8, "long", true);

private final String typeName;
private final int byteLen;
private final String pointType;
private final boolean scaled;

FieldTypeEnum(String typeName, int byteLen, String pointType) {
this(typeName, byteLen, pointType, false);
}

FieldTypeEnum(String typeName, int byteLen, String pointType, boolean scaled) {
this.typeName = typeName;
this.byteLen = byteLen;
this.pointType = pointType;
this.scaled = scaled;
}

String getTypeName() {
return typeName;
}

int getByteLen() {
return byteLen;
}

String getPointType() {
return pointType;
}

boolean isScaled() {
return scaled;
}

static FieldTypeEnum fromTypeName(String typeName) {
for (FieldTypeEnum fieldTypeEnum : values()) {
if (fieldTypeEnum.getTypeName().equals(typeName)) {
return fieldTypeEnum;
}
}
throw new IllegalArgumentException("Unknown field type: " + typeName);
}
}

public static BigInteger convertDoubleToBigInteger(double value) {
// we use big integer to represent unsigned long
BigInteger maxUnsignedLong = BigInteger.valueOf(2).pow(64).subtract(BigInteger.ONE);
@@ -608,7 +624,7 @@ public static BigInteger convertDoubleToBigInteger(double value) {

@Override
public Ranges buildRanges(LeafReaderContext leaf, SearchContext ctx, MappedFieldType fieldType) throws IOException {
return buildRanges(ctx, fieldType);
throw new UnsupportedOperationException("Range aggregation should not build ranges at segment level");
}

@Override
@@ -833,7 +849,7 @@ public void visit(int docID, byte[] packedValue) throws IOException {
@Override
public void visit(DocIdSetIterator iterator, byte[] packedValue) throws IOException {
visitPoints(packedValue, () -> {
for (int doc = iterator.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = iterator.nextDoc()) {
for (int doc = iterator.nextDoc(); doc != NO_MORE_DOCS; doc = iterator.nextDoc()) {
collector.count();
}
});
Original file line number Diff line number Diff line change
@@ -271,11 +271,10 @@ public RangeAggregator(
this.format = format;
this.keyed = keyed;
this.rangeFactory = rangeFactory;

this.ranges = ranges; // already sorted by the range.from and range.to

fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context);
fastFilterContext.setAggregationType(new FastFilterRewriteHelper.RangeAggregationType(valuesSource, ranges));
fastFilterContext.setAggregationType(new FastFilterRewriteHelper.RangeAggregationType(config, ranges));
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
fastFilterContext.buildRanges(Objects.requireNonNull(config.fieldType()));
}
Original file line number Diff line number Diff line change
@@ -32,16 +32,22 @@

package org.opensearch.search.aggregations.bucket.range;

import org.apache.lucene.document.DoubleField;
import org.apache.lucene.document.DoublePoint;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.BytesRef;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.index.mapper.DateFieldMapper;
@@ -67,6 +73,8 @@ public class RangeAggregatorTests extends AggregatorTestCase {
private static final String NUMBER_FIELD_NAME = "number";
private static final String DATE_FIELD_NAME = "date";

private static final String DOUBLE_FIELD_NAME = "double";

public void testNoMatchingField() throws IOException {
testCase(new MatchAllDocsQuery(), iw -> {
iw.addDocument(singleton(new SortedNumericDocValuesField("bogus_field_name", 7)));
@@ -305,6 +313,26 @@ public void testSubAggCollectsFromManyBucketsIfManyRanges() throws IOException {
});
}

public void testDoubleType() throws IOException {
RangeAggregationBuilder aggregationBuilder = new RangeAggregationBuilder("range").field(DOUBLE_FIELD_NAME)
.addRange(1, 2)
.addRange(2, 3);

testRewriteOptimizationCase(aggregationBuilder, DoublePoint.newRangeQuery(DOUBLE_FIELD_NAME, 0, 5), indexWriter -> {
indexWriter.addDocument(singleton(new DoubleField(DOUBLE_FIELD_NAME, 0.1, Field.Store.NO)));
indexWriter.addDocument(singleton(new DoubleField(DOUBLE_FIELD_NAME, 1.1, Field.Store.NO)));
indexWriter.addDocument(singleton(new DoubleField(DOUBLE_FIELD_NAME, 2.1, Field.Store.NO)));
}, range -> {
List<? extends InternalRange.Bucket> ranges = range.getBuckets();
assertEquals(2, ranges.size());
assertEquals("1.0-2.0", ranges.get(0).getKeyAsString());
assertEquals(1, ranges.get(0).getDocCount());
assertEquals("2.0-3.0", ranges.get(1).getKeyAsString());
assertEquals(1, ranges.get(1).getDocCount());
assertTrue(AggregationInspectionHelper.hasValue(range));
}, new NumberFieldMapper.NumberFieldType(DOUBLE_FIELD_NAME, NumberFieldMapper.NumberType.DOUBLE));
}

private void testCase(
Query query,
CheckedConsumer<RandomIndexWriter, IOException> buildIndex,
@@ -325,7 +353,7 @@ private void simpleTestCase(
) throws IOException {
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(NUMBER_FIELD_NAME, NumberFieldMapper.NumberType.INTEGER);

testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
testCase(aggregationBuilder, query, iw -> {
iw.addDocument(singleton(new SortedNumericDocValuesField(NUMBER_FIELD_NAME, 7)));
iw.addDocument(singleton(new SortedNumericDocValuesField(NUMBER_FIELD_NAME, 2)));
iw.addDocument(singleton(new SortedNumericDocValuesField(NUMBER_FIELD_NAME, 3)));
@@ -354,7 +382,32 @@ private void testCase(
fieldType
);
verify.accept(agg);
}
}
}

private void testRewriteOptimizationCase(
RangeAggregationBuilder aggregationBuilder,
Query query,
CheckedConsumer<IndexWriter, IOException> buildIndex,
Consumer<InternalRange<? extends InternalRange.Bucket, ? extends InternalRange>> verify,
MappedFieldType fieldType
) throws IOException {
try (Directory directory = newDirectory()) {
try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig().setCodec(TestUtil.getDefaultCodec()))) {
buildIndex.accept(indexWriter);
}

try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);

InternalRange<? extends InternalRange.Bucket, ? extends InternalRange> agg = searchAndReduce(
indexSearcher,
query,
aggregationBuilder,
fieldType
);
verify.accept(agg);
}
}
}