Skip to content

Commit

Permalink
Allow all raw index config in star-tree index (apache#13225)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored May 25, 2024
1 parent e84f3f5 commit 1950323
Show file tree
Hide file tree
Showing 15 changed files with 288 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.core.common.BlockDocIdIterator;
Expand Down Expand Up @@ -173,7 +174,7 @@ public void setUp()
StarTreeIndexConfig starTreeIndexConfig =
new StarTreeIndexConfig(Arrays.asList(DIMENSION_D1, DIMENSION_D2), null, null, Collections.singletonList(
new StarTreeAggregationConfig(METRIC, _valueAggregator.getAggregationType().getName(),
getCompressionCodec())), MAX_LEAF_RECORDS);
getCompressionCodec(), true, getIndexVersion(), null, null)), MAX_LEAF_RECORDS);
File indexDir = new File(TEMP_DIR, SEGMENT_NAME);
// Randomly build star-tree using on-heap or off-heap mode
MultipleTreesBuilder.BuildMode buildMode =
Expand Down Expand Up @@ -479,14 +480,21 @@ private Object getNextRawValue(int docId, ForwardIndexReader reader, ForwardInde
/**
* Can be overridden to force the compression codec.
*/
@Nullable
CompressionCodec getCompressionCodec() {
CompressionCodec[] compressionCodecs = CompressionCodec.values();
while (true) {
CompressionCodec compressionCodec = compressionCodecs[RANDOM.nextInt(compressionCodecs.length)];
if (compressionCodec.isApplicableToRawIndex()) {
return compressionCodec;
}
}
CompressionCodec compressionCodec = compressionCodecs[RANDOM.nextInt(compressionCodecs.length)];
return compressionCodec.isApplicableToRawIndex() ? compressionCodec : null;
}

/**
* Can be overridden to force the index version.
*/
@Nullable
Integer getIndexVersion() {
// Allow 2, 3, 4 or null
int version = 1 + RANDOM.nextInt(4);
return version > 1 ? version : null;
}

abstract ValueAggregator<R, A> getValueAggregator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
import org.apache.pinot.integration.tests.startree.SegmentInfoProvider;
import org.apache.pinot.integration.tests.startree.StarTreeQueryGenerator;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
import org.apache.pinot.spi.config.table.StarTreeAggregationConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
Expand Down Expand Up @@ -148,13 +149,19 @@ public void setUp()

private static StarTreeIndexConfig getStarTreeIndexConfig(List<String> dimensions, List<String> metrics,
int maxLeafRecords) {
List<String> functionColumnPairs = new ArrayList<>();
List<StarTreeAggregationConfig> aggregationConfigs = new ArrayList<>();
// Use default setting for COUNT(*) and custom setting for other aggregations for better coverage
aggregationConfigs.add(new StarTreeAggregationConfig("*", "COUNT"));
for (AggregationFunctionType functionType : AGGREGATION_FUNCTION_TYPES) {
if (functionType == AggregationFunctionType.COUNT) {
continue;
}
for (String metric : metrics) {
functionColumnPairs.add(new AggregationFunctionColumnPair(functionType, metric).toColumnName());
aggregationConfigs.add(
new StarTreeAggregationConfig(metric, functionType.name(), CompressionCodec.LZ4, false, 4, null, null));
}
}
return new StarTreeIndexConfig(dimensions, null, functionColumnPairs, null, maxLeafRecords);
return new StarTreeIndexConfig(dimensions, null, null, aggregationConfigs, maxLeafRecords);
}

@Test(dataProvider = "useBothQueryEngines")
Expand Down Expand Up @@ -184,17 +191,15 @@ public void testHardCodedFilteredAggQueries(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
String starQuery = "SELECT DepTimeBlk, COUNT(*), COUNT(*) FILTER (WHERE CRSDepTime = 35) FROM mytable "
+ "WHERE CRSDepTime != 35 "
+ "GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
+ "WHERE CRSDepTime != 35 GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
// Don't verify that the query plan uses StarTree index, as this query results in FILTER_EMPTY in the query plan.
// This is still a valuable test, as it caught a bug where only the subFilterContext was being preserved through
// AggragationFunctionUtils#buildFilteredAggregateProjectOperators
// AggregationFunctionUtils#buildFilteredAggregationInfos
testStarQuery(starQuery, false);

// Ensure the filtered agg and unfiltered agg can co-exist in one query
starQuery = "SELECT DepTimeBlk, COUNT(*), COUNT(*) FILTER (WHERE DivArrDelay > 20) FROM mytable "
+ "WHERE CRSDepTime != 35 "
+ "GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
+ "WHERE CRSDepTime != 35 GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
testStarQuery(starQuery, !useMultiStageQueryEngine);

starQuery = "SELECT DepTimeBlk, COUNT(*) FILTER (WHERE CRSDepTime != 35) FROM mytable "
Expand All @@ -211,8 +216,7 @@ private void testStarQuery(String starQuery, boolean verifyPlan)
if (verifyPlan) {
JsonNode starPlan = postQuery(explain + starQuery);
JsonNode referencePlan = postQuery(disableStarTree + explain + starQuery);
assertTrue(starPlan.toString().contains(filterStartreeIndex)
|| starPlan.toString().contains("FILTER_EMPTY")
assertTrue(starPlan.toString().contains(filterStartreeIndex) || starPlan.toString().contains("FILTER_EMPTY")
|| starPlan.toString().contains("ALL_SEGMENTS_PRUNED_ON_SERVER"),
"StarTree query did not indicate use of StarTree index in query plan. Plan: " + starPlan);
assertFalse(referencePlan.toString().contains(filterStartreeIndex),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ abstract class BaseSingleTreeBuilder implements SingleTreeBuilder {
final ValueAggregator[] _valueAggregators;
// Readers and data types for column in function-column pair
final PinotSegmentColumnReader[] _metricReaders;
final ChunkCompressionType[] _compressionType;
final AggregationSpec[] _aggregationSpecs;

final int _maxLeafRecords;

Expand Down Expand Up @@ -138,7 +138,7 @@ static class Record {
_metrics = new String[_numMetrics];
_valueAggregators = new ValueAggregator[_numMetrics];
_metricReaders = new PinotSegmentColumnReader[_numMetrics];
_compressionType = new ChunkCompressionType[_numMetrics];
_aggregationSpecs = new AggregationSpec[_numMetrics];

int index = 0;
for (Map.Entry<AggregationFunctionColumnPair, AggregationSpec> entry : aggregationSpecs.entrySet()) {
Expand All @@ -147,7 +147,7 @@ static class Record {
// TODO: Allow extra arguments in star-tree (e.g. log2m, precision)
_valueAggregators[index] =
ValueAggregatorFactory.getValueAggregator(functionColumnPair.getFunctionType(), Collections.emptyList());
_compressionType[index] = entry.getValue().getCompressionType();
_aggregationSpecs[index] = entry.getValue();
// Ignore the column for COUNT aggregation function
if (_valueAggregators[index].getAggregationType() != AggregationFunctionType.COUNT) {
String column = functionColumnPair.getColumn();
Expand Down Expand Up @@ -474,14 +474,18 @@ private void createForwardIndexes()
String metric = _metrics[i];
ValueAggregator valueAggregator = _valueAggregators[i];
DataType valueType = valueAggregator.getAggregatedValueType();
ChunkCompressionType compressionType = _compressionType[i];
AggregationSpec aggregationSpec = _aggregationSpecs[i];
ChunkCompressionType compressionType = ChunkCompressionType.valueOf(aggregationSpec.getCompressionCodec().name());
if (valueType == BYTES) {
metricIndexCreators[i] =
new SingleValueVarByteRawIndexCreator(_outputDir, compressionType, metric, _numDocs, BYTES,
valueAggregator.getMaxAggregatedValueByteSize());
valueAggregator.getMaxAggregatedValueByteSize(), aggregationSpec.isDeriveNumDocsPerChunk(),
aggregationSpec.getIndexVersion(), aggregationSpec.getTargetMaxChunkSizeBytes(),
aggregationSpec.getTargetDocsPerChunk());
} else {
metricIndexCreators[i] =
new SingleValueFixedByteRawIndexCreator(_outputDir, compressionType, metric, _numDocs, valueType);
new SingleValueFixedByteRawIndexCreator(_outputDir, compressionType, metric, _numDocs, valueType,
aggregationSpec.getIndexVersion(), aggregationSpec.getTargetDocsPerChunk());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
import org.apache.pinot.segment.spi.index.startree.AggregationSpec;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2Constants.MetadataKey;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2Metadata;
import org.apache.pinot.spi.config.table.StarTreeAggregationConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
Expand Down Expand Up @@ -91,11 +89,9 @@ public static StarTreeV2BuilderConfig fromIndexConfig(StarTreeIndexConfig indexC
AggregationFunctionColumnPair.fromAggregationConfig(aggregationConfig);
AggregationFunctionColumnPair storedType =
AggregationFunctionColumnPair.resolveToStoredType(aggregationFunctionColumnPair);
ChunkCompressionType compressionType =
ChunkCompressionType.valueOf(aggregationConfig.getCompressionCodec().name());
// If there is already an equivalent functionColumnPair in the map, do not load another.
// This prevents the duplication of the aggregation when the StarTree is constructed.
aggregationSpecs.putIfAbsent(storedType, new AggregationSpec(compressionType));
aggregationSpecs.putIfAbsent(storedType, new AggregationSpec(aggregationConfig));
}
}

Expand Down Expand Up @@ -298,24 +294,8 @@ public int getMaxLeafRecords() {
* Writes the metadata which is used to initialize the {@link StarTreeV2Metadata} when loading the segment.
*/
public void writeMetadata(Configuration metadataProperties, int totalDocs) {
metadataProperties.setProperty(MetadataKey.TOTAL_DOCS, totalDocs);
metadataProperties.setProperty(MetadataKey.DIMENSIONS_SPLIT_ORDER, _dimensionsSplitOrder);
metadataProperties.setProperty(MetadataKey.FUNCTION_COLUMN_PAIRS, _aggregationSpecs.keySet());
metadataProperties.setProperty(MetadataKey.AGGREGATION_COUNT, _aggregationSpecs.size());
int index = 0;
for (Map.Entry<AggregationFunctionColumnPair, AggregationSpec> entry : _aggregationSpecs.entrySet()) {
AggregationFunctionColumnPair functionColumnPair = entry.getKey();
AggregationSpec aggregationSpec = entry.getValue();
String prefix = MetadataKey.AGGREGATION_PREFIX + index + '.';
metadataProperties.setProperty(prefix + MetadataKey.FUNCTION_TYPE,
functionColumnPair.getFunctionType().getName());
metadataProperties.setProperty(prefix + MetadataKey.COLUMN_NAME, functionColumnPair.getColumn());
metadataProperties.setProperty(prefix + MetadataKey.COMPRESSION_CODEC, aggregationSpec.getCompressionType());
index++;
}
metadataProperties.setProperty(MetadataKey.MAX_LEAF_RECORDS, _maxLeafRecords);
metadataProperties.setProperty(MetadataKey.SKIP_STAR_NODE_CREATION_FOR_DIMENSIONS,
_skipStarNodeCreationForDimensions);
StarTreeV2Metadata.writeMetadata(metadataProperties, totalDocs, _dimensionsSplitOrder, _aggregationSpecs,
_maxLeafRecords, _skipStarNodeCreationForDimensions);
}

@Override
Expand Down
Loading

0 comments on commit 1950323

Please sign in to comment.