Skip to content

Commit

Permalink
[Star Tree] [Search] Resolve Date histogram with metric aggregation u…
Browse files Browse the repository at this point in the history
…sing star-tree (opensearch-project#16674)


---------

Signed-off-by: Sandesh Kumar <[email protected]>
Co-authored-by: Sandesh Kumar <[email protected]>
  • Loading branch information
sandeshkr419 and Sandesh Kumar authored Jan 27, 2025
1 parent e6fc600 commit b5234a5
Show file tree
Hide file tree
Showing 19 changed files with 1,144 additions and 78 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added new Setting property UnmodifiableOnRestore to prevent updating settings on restore snapshot ([#16957](https://github.com/opensearch-project/OpenSearch/pull/16957))
- Introduce Template query ([#16818](https://github.com/opensearch-project/OpenSearch/pull/16818))
- Propagate the sourceIncludes and excludes fields from fetchSourceContext to FieldsVisitor. ([#17080](https://github.com/opensearch-project/OpenSearch/pull/17080))
- [Star Tree] [Search] Resolving Date histogram with metric aggregation using star-tree ([#16674](https://github.com/opensearch-project/OpenSearch/pull/16674))

### Dependencies
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))
Expand Down
9 changes: 9 additions & 0 deletions server/src/main/java/org/opensearch/common/Rounding.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ public void writeTo(StreamOutput out) throws IOException {

public abstract byte id();

public DateTimeUnit unit() {
return null;
}

/**
* A strategy for rounding milliseconds since epoch.
*
Expand Down Expand Up @@ -517,6 +521,11 @@ public byte id() {
return ID;
}

@Override
public DateTimeUnit unit() {
return unit;
}

private LocalDateTime truncateLocalDateTime(LocalDateTime localDateTime) {
switch (unit) {
case SECOND_OF_MINUTE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,24 @@ public int compare(DateTimeUnitRounding unit1, DateTimeUnitRounding unit2) {
}
}

/**
* Returns the closest valid calendar interval to be used for the search interval
*/
public DateTimeUnitRounding findClosestValidInterval(DateTimeUnitRounding searchInterval) {
DateTimeUnitComparator comparator = new DateTimeUnitComparator();
DateTimeUnitRounding closestValidInterval = null;

// Find the largest interval that is less than or equal to search interval
for (DateTimeUnitRounding interval : sortedCalendarIntervals) {
if (comparator.compare(interval, searchInterval) <= 0) {
closestValidInterval = interval;
} else {
break;
}
}
return closestValidInterval;
}

/**
* Returns a sorted list of dateTimeUnits based on the DateTimeUnitComparator
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@
import org.opensearch.common.lucene.Lucene;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.codec.composite.CompositeIndexReader;
import org.opensearch.index.compositeindex.datacube.DateDimension;
import org.opensearch.index.compositeindex.datacube.Dimension;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.utils.date.DateTimeUnitAdapter;
import org.opensearch.index.compositeindex.datacube.startree.utils.date.DateTimeUnitRounding;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
import org.opensearch.index.mapper.CompositeDataCubeFieldType;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.search.aggregations.AggregatorFactory;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.StarTreeBucketCollector;
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory;
import org.opensearch.search.aggregations.metrics.MetricAggregatorFactory;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.builder.SearchSourceBuilder;
Expand All @@ -37,9 +41,10 @@
import org.opensearch.search.startree.StarTreeQueryContext;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -74,10 +79,16 @@ public static StarTreeQueryContext getStarTreeQueryContext(SearchContext context
);

for (AggregatorFactory aggregatorFactory : context.aggregations().factories().getFactories()) {
MetricStat metricStat = validateStarTreeMetricSupport(compositeMappedFieldType, aggregatorFactory);
if (metricStat == null) {
return null;
// first check for aggregation is a metric aggregation
if (validateStarTreeMetricSupport(compositeMappedFieldType, aggregatorFactory)) {
continue;
}

// if not a metric aggregation, check for applicable date histogram shape
if (validateDateHistogramSupport(compositeMappedFieldType, aggregatorFactory)) {
continue;
}
return null;
}

// need to cache star tree values only for multiple aggregations
Expand All @@ -99,64 +110,85 @@ private static StarTreeQueryContext tryCreateStarTreeQueryContext(
Map<String, Long> queryMap;
if (queryBuilder == null || queryBuilder instanceof MatchAllQueryBuilder) {
queryMap = null;
} else if (queryBuilder instanceof TermQueryBuilder) {
} else if (queryBuilder instanceof TermQueryBuilder termQueryBuilder) {
// TODO: Add support for keyword fields
if (compositeFieldType.getDimensions().stream().anyMatch(d -> d.getDocValuesType() != DocValuesType.SORTED_NUMERIC)) {
// return null for non-numeric fields
return null;
}

List<String> supportedDimensions = compositeFieldType.getDimensions()
Dimension matchedDimension = compositeFieldType.getDimensions()
.stream()
.map(Dimension::getField)
.collect(Collectors.toList());
queryMap = getStarTreePredicates(queryBuilder, supportedDimensions);
if (queryMap == null) {
.filter(d -> (d.getField().equals(termQueryBuilder.fieldName()) && d.getDocValuesType() == DocValuesType.SORTED_NUMERIC))
.findFirst()
.orElse(null);
if (matchedDimension == null) {
return null;
}
queryMap = Map.of(termQueryBuilder.fieldName(), Long.parseLong(termQueryBuilder.value().toString()));
} else {
return null;
}
return new StarTreeQueryContext(compositeIndexFieldInfo, queryMap, cacheStarTreeValuesSize);
}

/**
* Parse query body to star-tree predicates
* @param queryBuilder to match star-tree supported query shape
* @return predicates to match
*/
private static Map<String, Long> getStarTreePredicates(QueryBuilder queryBuilder, List<String> supportedDimensions) {
TermQueryBuilder tq = (TermQueryBuilder) queryBuilder;
String field = tq.fieldName();
if (!supportedDimensions.contains(field)) {
return null;
}
long inputQueryVal = Long.parseLong(tq.value().toString());

// Create a map with the field and the value
Map<String, Long> predicateMap = new HashMap<>();
predicateMap.put(field, inputQueryVal);
return predicateMap;
}

private static MetricStat validateStarTreeMetricSupport(
private static boolean validateStarTreeMetricSupport(
CompositeDataCubeFieldType compositeIndexFieldInfo,
AggregatorFactory aggregatorFactory
) {
if (aggregatorFactory instanceof MetricAggregatorFactory && aggregatorFactory.getSubFactories().getFactories().length == 0) {
if (aggregatorFactory instanceof MetricAggregatorFactory metricAggregatorFactory
&& metricAggregatorFactory.getSubFactories().getFactories().length == 0) {
String field;
Map<String, List<MetricStat>> supportedMetrics = compositeIndexFieldInfo.getMetrics()
.stream()
.collect(Collectors.toMap(Metric::getField, Metric::getMetrics));

MetricStat metricStat = ((MetricAggregatorFactory) aggregatorFactory).getMetricStat();
field = ((MetricAggregatorFactory) aggregatorFactory).getField();
MetricStat metricStat = metricAggregatorFactory.getMetricStat();
field = metricAggregatorFactory.getField();

return supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(metricStat);
}
return false;
}

private static boolean validateDateHistogramSupport(
CompositeDataCubeFieldType compositeIndexFieldInfo,
AggregatorFactory aggregatorFactory
) {
if (!(aggregatorFactory instanceof DateHistogramAggregatorFactory dateHistogramAggregatorFactory)
|| aggregatorFactory.getSubFactories().getFactories().length < 1) {
return false;
}

// Find the DateDimension in the dimensions list
DateDimension starTreeDateDimension = null;
for (Dimension dimension : compositeIndexFieldInfo.getDimensions()) {
if (dimension instanceof DateDimension) {
starTreeDateDimension = (DateDimension) dimension;
break;
}
}

// If no DateDimension is found, validation fails
if (starTreeDateDimension == null) {
return false;
}

// Ensure the rounding is not null
if (dateHistogramAggregatorFactory.getRounding() == null) {
return false;
}

// Find the closest valid interval in the DateTimeUnitRounding class associated with star tree
DateTimeUnitRounding rounding = starTreeDateDimension.findClosestValidInterval(
new DateTimeUnitAdapter(dateHistogramAggregatorFactory.getRounding())
);
if (rounding == null) {
return false;
}

if (field != null && supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(metricStat)) {
return metricStat;
// Validate all sub-factories
for (AggregatorFactory subFactory : aggregatorFactory.getSubFactories().getFactories()) {
if (!validateStarTreeMetricSupport(compositeIndexFieldInfo, subFactory)) {
return false;
}
}
return null;
return true;
}

public static CompositeIndexFieldInfo getSupportedStarTree(SearchContext context) {
Expand Down Expand Up @@ -222,11 +254,37 @@ public static LeafBucketCollector getStarTreeLeafCollector(
// Call the final consumer after processing all entries
finalConsumer.run();

// Return a LeafBucketCollector that terminates collection
return new LeafBucketCollectorBase(sub, valuesSource.doubleValues(ctx)) {
// Terminate after pre-computing aggregation
throw new CollectionTerminatedException();
}

public static StarTreeBucketCollector getStarTreeBucketMetricCollector(
CompositeIndexFieldInfo starTree,
String metric,
ValuesSource.Numeric valuesSource,
StarTreeBucketCollector parentCollector,
Consumer<Long> growArrays,
BiConsumer<Long, Long> updateBucket
) throws IOException {
assert parentCollector != null;
return new StarTreeBucketCollector(parentCollector) {
String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(
starTree.getField(),
((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName(),
metric
);
SortedNumericStarTreeValuesIterator metricValuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
.getMetricValuesIterator(metricName);

@Override
public void collect(int doc, long bucket) {
throw new CollectionTerminatedException();
public void collectStarTreeEntry(int starTreeEntryBit, long bucket) throws IOException {
growArrays.accept(bucket);
// Advance the valuesIterator to the current bit
if (!metricValuesIterator.advanceExact(starTreeEntryBit)) {
return; // Skip if no entries for this document
}
long metricValue = metricValuesIterator.nextValue();
updateBucket.accept(bucket, metricValue);
}
};
}
Expand All @@ -240,7 +298,7 @@ public static FixedBitSet getStarTreeFilteredValues(SearchContext context, LeafR
throws IOException {
FixedBitSet result = context.getStarTreeQueryContext().getStarTreeValues(ctx);
if (result == null) {
result = StarTreeFilter.getStarTreeResult(starTreeValues, context.getStarTreeQueryContext().getQueryMap());
result = StarTreeFilter.getStarTreeResult(starTreeValues, context.getStarTreeQueryContext().getQueryMap(), Set.of());
context.getStarTreeQueryContext().setStarTreeValues(ctx, result);
}
return result;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations;

import org.apache.lucene.util.FixedBitSet;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* Collector for star tree aggregation
* This abstract class exposes utilities to help avoid traversing star-tree multiple times and
* collect relevant metrics across nested aggregations in a single traversal
* @opensearch.internal
*/
@ExperimentalApi
public abstract class StarTreeBucketCollector {

protected final StarTreeValues starTreeValues;
protected final FixedBitSet matchingDocsBitSet;
protected final List<StarTreeBucketCollector> subCollectors = new ArrayList<>();

public StarTreeBucketCollector(StarTreeValues starTreeValues, FixedBitSet matchingDocsBitSet) throws IOException {
this.starTreeValues = starTreeValues;
this.matchingDocsBitSet = matchingDocsBitSet;
this.setSubCollectors();
}

public StarTreeBucketCollector(StarTreeBucketCollector parent) throws IOException {
this.starTreeValues = parent.getStarTreeValues();
this.matchingDocsBitSet = parent.getMatchingDocsBitSet();
this.setSubCollectors();
}

/**
* Sets the sub-collectors to track nested aggregators
*/
public void setSubCollectors() throws IOException {};

/**
* Returns a list of sub-collectors to track nested aggregators
*/
public List<StarTreeBucketCollector> getSubCollectors() {
return subCollectors;
}

/**
* Returns the tree values to iterate
*/
public StarTreeValues getStarTreeValues() {
return starTreeValues;
}

/**
* Returns the matching docs bitset to iterate upon the star-tree values based on search query
*/
public FixedBitSet getMatchingDocsBitSet() {
return matchingDocsBitSet;
}

/**
* Collects the star tree entry and bucket ordinal to update
* The method implementation should identify the metrics to collect from that star-tree entry to the specified bucket
*/
public abstract void collectStarTreeEntry(int starTreeEntry, long bucket) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations;

import org.apache.lucene.index.LeafReaderContext;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;

import java.io.IOException;

/**
* This interface is used to pre-compute the star tree bucket collector for each segment/leaf.
* It is utilized by parent aggregation to retrieve a StarTreeBucketCollector which can be used to
* pre-compute the associated aggregation along with its parent pre-computation using star-tree
*
* @opensearch.internal
*/
public interface StarTreePreComputeCollector {
/**
* Get the star tree bucket collector for the specified segment/leaf
*/
StarTreeBucketCollector getStarTreeBucketCollector(
LeafReaderContext ctx,
CompositeIndexFieldInfo starTree,
StarTreeBucketCollector parentCollector
) throws IOException;
}
Loading

0 comments on commit b5234a5

Please sign in to comment.