diff --git a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java index d323f6d55042..847d9e156812 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.core.common.BlockDocIdIterator; @@ -173,7 +174,7 @@ public void setUp() StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList(DIMENSION_D1, DIMENSION_D2), null, null, Collections.singletonList( new StarTreeAggregationConfig(METRIC, _valueAggregator.getAggregationType().getName(), - getCompressionCodec())), MAX_LEAF_RECORDS); + getCompressionCodec(), true, getIndexVersion(), null, null)), MAX_LEAF_RECORDS); File indexDir = new File(TEMP_DIR, SEGMENT_NAME); // Randomly build star-tree using on-heap or off-heap mode MultipleTreesBuilder.BuildMode buildMode = @@ -479,14 +480,21 @@ private Object getNextRawValue(int docId, ForwardIndexReader reader, ForwardInde /** * Can be overridden to force the compression codec. */ + @Nullable CompressionCodec getCompressionCodec() { CompressionCodec[] compressionCodecs = CompressionCodec.values(); - while (true) { - CompressionCodec compressionCodec = compressionCodecs[RANDOM.nextInt(compressionCodecs.length)]; - if (compressionCodec.isApplicableToRawIndex()) { - return compressionCodec; - } - } + CompressionCodec compressionCodec = compressionCodecs[RANDOM.nextInt(compressionCodecs.length)]; + return compressionCodec.isApplicableToRawIndex() ? compressionCodec : null; + } + + /** + * Can be overridden to force the index version. + */ + @Nullable + Integer getIndexVersion() { + // Allow 2, 3, 4 or null + int version = 1 + RANDOM.nextInt(4); + return version > 1 ? version : null; } abstract ValueAggregator getValueAggregator(); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java index 94b0ae9abb68..cc05e3131e52 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java @@ -29,7 +29,8 @@ import org.apache.pinot.integration.tests.startree.SegmentInfoProvider; import org.apache.pinot.integration.tests.startree.StarTreeQueryGenerator; import org.apache.pinot.segment.spi.AggregationFunctionType; -import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair; +import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec; +import org.apache.pinot.spi.config.table.StarTreeAggregationConfig; import org.apache.pinot.spi.config.table.StarTreeIndexConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; @@ -148,13 +149,19 @@ public void setUp() private static StarTreeIndexConfig getStarTreeIndexConfig(List dimensions, List metrics, int maxLeafRecords) { - List functionColumnPairs = new ArrayList<>(); + List aggregationConfigs = new ArrayList<>(); + // Use default setting for COUNT(*) and custom setting for other aggregations for better coverage + aggregationConfigs.add(new StarTreeAggregationConfig("*", "COUNT")); for (AggregationFunctionType functionType : AGGREGATION_FUNCTION_TYPES) { + if (functionType == AggregationFunctionType.COUNT) { + continue; + } for (String metric : metrics) { - functionColumnPairs.add(new AggregationFunctionColumnPair(functionType, metric).toColumnName()); + aggregationConfigs.add( + new StarTreeAggregationConfig(metric, functionType.name(), CompressionCodec.LZ4, false, 4, null, null)); } } - return new StarTreeIndexConfig(dimensions, null, functionColumnPairs, null, maxLeafRecords); + return new StarTreeIndexConfig(dimensions, null, null, aggregationConfigs, maxLeafRecords); } @Test(dataProvider = "useBothQueryEngines") @@ -184,17 +191,15 @@ public void testHardCodedFilteredAggQueries(boolean useMultiStageQueryEngine) throws Exception { setUseMultiStageQueryEngine(useMultiStageQueryEngine); String starQuery = "SELECT DepTimeBlk, COUNT(*), COUNT(*) FILTER (WHERE CRSDepTime = 35) FROM mytable " - + "WHERE CRSDepTime != 35 " - + "GROUP BY DepTimeBlk ORDER BY DepTimeBlk"; + + "WHERE CRSDepTime != 35 GROUP BY DepTimeBlk ORDER BY DepTimeBlk"; // Don't verify that the query plan uses StarTree index, as this query results in FILTER_EMPTY in the query plan. // This is still a valuable test, as it caught a bug where only the subFilterContext was being preserved through - // AggragationFunctionUtils#buildFilteredAggregateProjectOperators + // AggregationFunctionUtils#buildFilteredAggregationInfos testStarQuery(starQuery, false); // Ensure the filtered agg and unfiltered agg can co-exist in one query starQuery = "SELECT DepTimeBlk, COUNT(*), COUNT(*) FILTER (WHERE DivArrDelay > 20) FROM mytable " - + "WHERE CRSDepTime != 35 " - + "GROUP BY DepTimeBlk ORDER BY DepTimeBlk"; + + "WHERE CRSDepTime != 35 GROUP BY DepTimeBlk ORDER BY DepTimeBlk"; testStarQuery(starQuery, !useMultiStageQueryEngine); starQuery = "SELECT DepTimeBlk, COUNT(*) FILTER (WHERE CRSDepTime != 35) FROM mytable " @@ -211,8 +216,7 @@ private void testStarQuery(String starQuery, boolean verifyPlan) if (verifyPlan) { JsonNode starPlan = postQuery(explain + starQuery); JsonNode referencePlan = postQuery(disableStarTree + explain + starQuery); - assertTrue(starPlan.toString().contains(filterStartreeIndex) - || starPlan.toString().contains("FILTER_EMPTY") + assertTrue(starPlan.toString().contains(filterStartreeIndex) || starPlan.toString().contains("FILTER_EMPTY") || starPlan.toString().contains("ALL_SEGMENTS_PRUNED_ON_SERVER"), "StarTree query did not indicate use of StarTree index in query plan. Plan: " + starPlan); assertFalse(referencePlan.toString().contains(filterStartreeIndex), diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/BaseSingleTreeBuilder.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/BaseSingleTreeBuilder.java index c2e508d366d9..c0f9fba2279c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/BaseSingleTreeBuilder.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/BaseSingleTreeBuilder.java @@ -79,7 +79,7 @@ abstract class BaseSingleTreeBuilder implements SingleTreeBuilder { final ValueAggregator[] _valueAggregators; // Readers and data types for column in function-column pair final PinotSegmentColumnReader[] _metricReaders; - final ChunkCompressionType[] _compressionType; + final AggregationSpec[] _aggregationSpecs; final int _maxLeafRecords; @@ -138,7 +138,7 @@ static class Record { _metrics = new String[_numMetrics]; _valueAggregators = new ValueAggregator[_numMetrics]; _metricReaders = new PinotSegmentColumnReader[_numMetrics]; - _compressionType = new ChunkCompressionType[_numMetrics]; + _aggregationSpecs = new AggregationSpec[_numMetrics]; int index = 0; for (Map.Entry entry : aggregationSpecs.entrySet()) { @@ -147,7 +147,7 @@ static class Record { // TODO: Allow extra arguments in star-tree (e.g. log2m, precision) _valueAggregators[index] = ValueAggregatorFactory.getValueAggregator(functionColumnPair.getFunctionType(), Collections.emptyList()); - _compressionType[index] = entry.getValue().getCompressionType(); + _aggregationSpecs[index] = entry.getValue(); // Ignore the column for COUNT aggregation function if (_valueAggregators[index].getAggregationType() != AggregationFunctionType.COUNT) { String column = functionColumnPair.getColumn(); @@ -474,14 +474,18 @@ private void createForwardIndexes() String metric = _metrics[i]; ValueAggregator valueAggregator = _valueAggregators[i]; DataType valueType = valueAggregator.getAggregatedValueType(); - ChunkCompressionType compressionType = _compressionType[i]; + AggregationSpec aggregationSpec = _aggregationSpecs[i]; + ChunkCompressionType compressionType = ChunkCompressionType.valueOf(aggregationSpec.getCompressionCodec().name()); if (valueType == BYTES) { metricIndexCreators[i] = new SingleValueVarByteRawIndexCreator(_outputDir, compressionType, metric, _numDocs, BYTES, - valueAggregator.getMaxAggregatedValueByteSize()); + valueAggregator.getMaxAggregatedValueByteSize(), aggregationSpec.isDeriveNumDocsPerChunk(), + aggregationSpec.getIndexVersion(), aggregationSpec.getTargetMaxChunkSizeBytes(), + aggregationSpec.getTargetDocsPerChunk()); } else { metricIndexCreators[i] = - new SingleValueFixedByteRawIndexCreator(_outputDir, compressionType, metric, _numDocs, valueType); + new SingleValueFixedByteRawIndexCreator(_outputDir, compressionType, metric, _numDocs, valueType, + aggregationSpec.getIndexVersion(), aggregationSpec.getTargetDocsPerChunk()); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfig.java index eca08f8787ab..d19b6e5d30c4 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfig.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfig.java @@ -35,10 +35,8 @@ import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.SegmentMetadata; -import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair; import org.apache.pinot.segment.spi.index.startree.AggregationSpec; -import org.apache.pinot.segment.spi.index.startree.StarTreeV2Constants.MetadataKey; import org.apache.pinot.segment.spi.index.startree.StarTreeV2Metadata; import org.apache.pinot.spi.config.table.StarTreeAggregationConfig; import org.apache.pinot.spi.config.table.StarTreeIndexConfig; @@ -91,11 +89,9 @@ public static StarTreeV2BuilderConfig fromIndexConfig(StarTreeIndexConfig indexC AggregationFunctionColumnPair.fromAggregationConfig(aggregationConfig); AggregationFunctionColumnPair storedType = AggregationFunctionColumnPair.resolveToStoredType(aggregationFunctionColumnPair); - ChunkCompressionType compressionType = - ChunkCompressionType.valueOf(aggregationConfig.getCompressionCodec().name()); // If there is already an equivalent functionColumnPair in the map, do not load another. // This prevents the duplication of the aggregation when the StarTree is constructed. - aggregationSpecs.putIfAbsent(storedType, new AggregationSpec(compressionType)); + aggregationSpecs.putIfAbsent(storedType, new AggregationSpec(aggregationConfig)); } } @@ -298,24 +294,8 @@ public int getMaxLeafRecords() { * Writes the metadata which is used to initialize the {@link StarTreeV2Metadata} when loading the segment. */ public void writeMetadata(Configuration metadataProperties, int totalDocs) { - metadataProperties.setProperty(MetadataKey.TOTAL_DOCS, totalDocs); - metadataProperties.setProperty(MetadataKey.DIMENSIONS_SPLIT_ORDER, _dimensionsSplitOrder); - metadataProperties.setProperty(MetadataKey.FUNCTION_COLUMN_PAIRS, _aggregationSpecs.keySet()); - metadataProperties.setProperty(MetadataKey.AGGREGATION_COUNT, _aggregationSpecs.size()); - int index = 0; - for (Map.Entry entry : _aggregationSpecs.entrySet()) { - AggregationFunctionColumnPair functionColumnPair = entry.getKey(); - AggregationSpec aggregationSpec = entry.getValue(); - String prefix = MetadataKey.AGGREGATION_PREFIX + index + '.'; - metadataProperties.setProperty(prefix + MetadataKey.FUNCTION_TYPE, - functionColumnPair.getFunctionType().getName()); - metadataProperties.setProperty(prefix + MetadataKey.COLUMN_NAME, functionColumnPair.getColumn()); - metadataProperties.setProperty(prefix + MetadataKey.COMPRESSION_CODEC, aggregationSpec.getCompressionType()); - index++; - } - metadataProperties.setProperty(MetadataKey.MAX_LEAF_RECORDS, _maxLeafRecords); - metadataProperties.setProperty(MetadataKey.SKIP_STAR_NODE_CREATION_FOR_DIMENSIONS, - _skipStarNodeCreationForDimensions); + StarTreeV2Metadata.writeMetadata(metadataProperties, totalDocs, _dimensionsSplitOrder, _aggregationSpecs, + _maxLeafRecords, _skipStarNodeCreationForDimensions); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index aff68eb180cf..d05a896483f9 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -512,7 +512,6 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc } } - // Transform configs List transformConfigs = ingestionConfig.getTransformConfigs(); if (transformConfigs != null) { @@ -652,13 +651,13 @@ static void validateTaskConfigs(TableConfig tableConfig, Schema schema) { String.format("Task %s contains an invalid cron schedule: %s", taskTypeConfigName, cronExprStr), e); } } - boolean isAllowDownloadFromServer = - Boolean.parseBoolean(taskTypeConfig.getOrDefault(TableTaskConfig.MINION_ALLOW_DOWNLOAD_FROM_SERVER, - String.valueOf(TableTaskConfig.DEFAULT_MINION_ALLOW_DOWNLOAD_FROM_SERVER))); + boolean isAllowDownloadFromServer = Boolean.parseBoolean( + taskTypeConfig.getOrDefault(TableTaskConfig.MINION_ALLOW_DOWNLOAD_FROM_SERVER, + String.valueOf(TableTaskConfig.DEFAULT_MINION_ALLOW_DOWNLOAD_FROM_SERVER))); if (isAllowDownloadFromServer) { Preconditions.checkState(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme() != null, String.format("Table %s has task %s with allowDownloadFromServer set to true, but " - + "peerSegmentDownloadScheme is not set in the table config", tableConfig.getTableName(), + + "peerSegmentDownloadScheme is not set in the table config", tableConfig.getTableName(), taskTypeConfigName)); } // Task Specific validation for REALTIME_TO_OFFLINE_TASK_TYPE @@ -773,9 +772,9 @@ static void validateUpsertAndDedupConfig(TableConfig tableConfig, Schema schema) tableConfig.getRoutingConfig() != null && isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()), "Upsert/Dedup table must use strict replica-group (i.e. strictReplicaGroup) based routing"); Preconditions.checkState(tableConfig.getTenantConfig().getTagOverrideConfig() == null || ( - tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeConsuming() == null - && tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeCompleted() - == null), "Invalid tenant tag override used for Upsert/Dedup table"); + tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeConsuming() == null + && tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeCompleted() == null), + "Invalid tenant tag override used for Upsert/Dedup table"); // specifically for upsert UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); @@ -802,15 +801,13 @@ static void validateUpsertAndDedupConfig(TableConfig tableConfig, Schema schema) Preconditions.checkState(fieldSpec.isSingleValueField(), String.format("The deleteRecordColumn - %s must be a single-valued column", deleteRecordColumn)); DataType dataType = fieldSpec.getDataType(); - Preconditions.checkState( - dataType == DataType.BOOLEAN || dataType == DataType.STRING || dataType.isNumeric(), + Preconditions.checkState(dataType == DataType.BOOLEAN || dataType == DataType.STRING || dataType.isNumeric(), String.format("The deleteRecordColumn - %s must be of type: String / Boolean / Numeric", deleteRecordColumn)); } String outOfOrderRecordColumn = upsertConfig.getOutOfOrderRecordColumn(); - Preconditions.checkState( - outOfOrderRecordColumn == null || !upsertConfig.isDropOutOfOrderRecord(), + Preconditions.checkState(outOfOrderRecordColumn == null || !upsertConfig.isDropOutOfOrderRecord(), "outOfOrderRecordColumn and dropOutOfOrderRecord shouldn't exist together for upsert table"); if (outOfOrderRecordColumn != null) { @@ -846,8 +843,8 @@ static void validateTTLForUpsertConfig(TableConfig tableConfig, Schema schema) { String comparisonColumn = comparisonColumns.get(0); DataType comparisonColumnDataType = schema.getFieldSpecFor(comparisonColumn).getDataType(); Preconditions.checkState(comparisonColumnDataType.isNumeric(), - "MetadataTTL / DeletedKeysTTL must have comparison column: %s in numeric type, found: %s", - comparisonColumn, comparisonColumnDataType); + "MetadataTTL / DeletedKeysTTL must have comparison column: %s in numeric type, found: %s", comparisonColumn, + comparisonColumnDataType); } if (upsertConfig.getMetadataTTL() > 0) { @@ -878,14 +875,11 @@ static void validateInstancePartitionsTypeMapConfig(TableConfig tableConfig) { tableConfig.getInstanceAssignmentConfigMap().get(instancePartitionsType.toString()); if (instanceAssignmentConfig.getPartitionSelector() == InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR) { - Preconditions.checkState( - tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType), + Preconditions.checkState(tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType), String.format("Both InstanceAssignmentConfigMap and InstancePartitionsMap needed for %s, as " - + "MIRROR_SERVER_SET_PARTITION_SELECTOR is used", - instancePartitionsType)); + + "MIRROR_SERVER_SET_PARTITION_SELECTOR is used", instancePartitionsType)); } else { - Preconditions.checkState( - !tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType), + Preconditions.checkState(!tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType), String.format("Both InstanceAssignmentConfigMap and InstancePartitionsMap set for %s", instancePartitionsType)); } @@ -1109,7 +1103,6 @@ private static void validateIndexingConfig(IndexingConfig indexingConfig, @Nulla } List starTreeIndexConfigList = indexingConfig.getStarTreeIndexConfigs(); - Set storedTypes = new HashSet<>(); if (starTreeIndexConfigList != null) { for (StarTreeIndexConfig starTreeIndexConfig : starTreeIndexConfigList) { // Dimension split order cannot be null @@ -1117,6 +1110,10 @@ private static void validateIndexingConfig(IndexingConfig indexingConfig, @Nulla columnNameToConfigMap.put(columnName, STAR_TREE_CONFIG_NAME); } List functionColumnPairs = starTreeIndexConfig.getFunctionColumnPairs(); + List aggregationConfigs = starTreeIndexConfig.getAggregationConfigs(); + Preconditions.checkState(functionColumnPairs == null || aggregationConfigs == null, + "Only one of 'functionColumnPairs' or 'aggregationConfigs' can be specified in StarTreeIndexConfig"); + Set storedTypes = new HashSet<>(); if (functionColumnPairs != null) { for (String functionColumnPair : functionColumnPairs) { AggregationFunctionColumnPair columnPair; @@ -1137,7 +1134,6 @@ private static void validateIndexingConfig(IndexingConfig indexingConfig, @Nulla } } } - List aggregationConfigs = starTreeIndexConfig.getAggregationConfigs(); if (aggregationConfigs != null) { for (StarTreeAggregationConfig aggregationConfig : aggregationConfigs) { AggregationFunctionColumnPair columnPair; @@ -1341,9 +1337,8 @@ private static void validateForwardIndexDisabledIndexCompatibility(String column } Preconditions.checkState(!indexingConfig.isOptimizeDictionaryForMetrics() && !indexingConfig.isOptimizeDictionary(), - String.format( - "Dictionary override optimization options (OptimizeDictionary, optimizeDictionaryForMetrics)" - + " not supported with forward index for column: %s, disabled", columnName)); + String.format("Dictionary override optimization options (OptimizeDictionary, optimizeDictionaryForMetrics)" + + " not supported with forward index for column: %s, disabled", columnName)); boolean hasDictionary = fieldConfig.getEncodingType() == EncodingType.DICTIONARY; boolean hasInvertedIndex = @@ -1421,9 +1416,9 @@ public static void ensureStorageQuotaConstraints(TableConfig tableConfig, String tableConfig.getTableName()); } else { if (quotaConfig.getStorageInBytes() > maxAllowedSizeInBytes) { - throw new IllegalStateException(String.format( - "Exceeded storage size for dimension table. Requested size: %d, Max allowed size: %d", - quotaConfig.getStorageInBytes(), maxAllowedSizeInBytes)); + throw new IllegalStateException( + String.format("Exceeded storage size for dimension table. Requested size: %d, Max allowed size: %d", + quotaConfig.getStorageInBytes(), maxAllowedSizeInBytes)); } } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeBuilderUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeBuilderUtilsTest.java index 71ee7c2251df..7728c684e746 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeBuilderUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeBuilderUtilsTest.java @@ -67,7 +67,7 @@ public void testAreStarTreeBuilderConfigListsEqual() { Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 200); // Create StartTreeAggregationConfigs with StarTreeAggregationConfig. - StarTreeAggregationConfig starTreeAggregationConfig1 = new StarTreeAggregationConfig("Distance", "MAX", null); + StarTreeAggregationConfig starTreeAggregationConfig1 = new StarTreeAggregationConfig("Distance", "MAX"); // Different AggregationConfig. StarTreeIndexConfig starTreeIndexConfig5 = new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null, null, diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfigTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfigTest.java index 726ee3feeea9..cba984e5b188 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfigTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfigTest.java @@ -23,19 +23,16 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Set; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.Constants; -import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair; import org.apache.pinot.segment.spi.index.startree.AggregationSpec; -import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec; import org.apache.pinot.spi.config.table.StarTreeAggregationConfig; import org.apache.pinot.spi.config.table.StarTreeIndexConfig; import org.apache.pinot.spi.data.FieldSpec.DataType; @@ -136,33 +133,29 @@ public void testDefaultConfigFromJsonNodeSegmentMetadata() { @Test public void testBuildFromIndexConfig() { - List aggregationConfigs = - List.of(new StarTreeAggregationConfig("m1", "SUM", CompressionCodec.LZ4)); + List aggregationConfigs = List.of(new StarTreeAggregationConfig("m1", "SUM")); StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(List.of("d1"), null, null, aggregationConfigs, 1); StarTreeV2BuilderConfig builderConfig = StarTreeV2BuilderConfig.fromIndexConfig(starTreeIndexConfig); - assertEquals(builderConfig.getMaxLeafRecords(), 1); assertEquals(builderConfig.getDimensionsSplitOrder(), List.of("d1")); - assertEquals(builderConfig.getFunctionColumnPairs(), - Set.of(new AggregationFunctionColumnPair(AggregationFunctionType.SUM, "m1"))); assertTrue(builderConfig.getSkipStarNodeCreationForDimensions().isEmpty()); - assertEquals(builderConfig.getAggregationSpecs().values(), - Collections.singleton(new AggregationSpec(ChunkCompressionType.LZ4))); + assertEquals(builderConfig.getAggregationSpecs(), + Map.of(new AggregationFunctionColumnPair(AggregationFunctionType.SUM, "m1"), AggregationSpec.DEFAULT)); + assertEquals(builderConfig.getMaxLeafRecords(), 1); } @Test public void testAggregationSpecUniqueness() { List aggregationConfigs = - List.of(new StarTreeAggregationConfig("m1", "distinctCountThetaSketch", CompressionCodec.LZ4), - new StarTreeAggregationConfig("m1", "distinctCountRawThetaSketch", CompressionCodec.LZ4)); + List.of(new StarTreeAggregationConfig("m1", "distinctCountThetaSketch"), + new StarTreeAggregationConfig("m1", "distinctCountRawThetaSketch")); StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(List.of("d1"), null, null, aggregationConfigs, 1); StarTreeV2BuilderConfig builderConfig = StarTreeV2BuilderConfig.fromIndexConfig(starTreeIndexConfig); - assertEquals(builderConfig.getMaxLeafRecords(), 1); assertEquals(builderConfig.getDimensionsSplitOrder(), List.of("d1")); - assertEquals(builderConfig.getFunctionColumnPairs(), - Set.of(new AggregationFunctionColumnPair(AggregationFunctionType.DISTINCTCOUNTTHETASKETCH, "m1"))); assertTrue(builderConfig.getSkipStarNodeCreationForDimensions().isEmpty()); - assertEquals(builderConfig.getAggregationSpecs().values(), - Collections.singleton(new AggregationSpec(ChunkCompressionType.LZ4))); + assertEquals(builderConfig.getAggregationSpecs(), + Map.of(new AggregationFunctionColumnPair(AggregationFunctionType.DISTINCTCOUNTTHETASKETCH, "m1"), + AggregationSpec.DEFAULT)); + assertEquals(builderConfig.getMaxLeafRecords(), 1); } @Test @@ -171,12 +164,12 @@ public void testFunctionColumnPairUniqueness() { StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(List.of("d1"), null, functionColumnPairs, null, 1); StarTreeV2BuilderConfig builderConfig = StarTreeV2BuilderConfig.fromIndexConfig(starTreeIndexConfig); - assertEquals(builderConfig.getMaxLeafRecords(), 1); assertEquals(builderConfig.getDimensionsSplitOrder(), List.of("d1")); - assertEquals(builderConfig.getFunctionColumnPairs(), - Set.of(new AggregationFunctionColumnPair(AggregationFunctionType.DISTINCTCOUNTTHETASKETCH, "m1"))); assertTrue(builderConfig.getSkipStarNodeCreationForDimensions().isEmpty()); - assertEquals(builderConfig.getAggregationSpecs().values(), Collections.singleton(AggregationSpec.DEFAULT)); + assertEquals(builderConfig.getAggregationSpecs(), + Map.of(new AggregationFunctionColumnPair(AggregationFunctionType.DISTINCTCOUNTTHETASKETCH, "m1"), + AggregationSpec.DEFAULT)); + assertEquals(builderConfig.getMaxLeafRecords(), 1); } private ColumnMetadata getColumnMetadata(String column, boolean hasDictionary, int cardinality) { diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index c8b458020e3a..55345a876d68 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -1411,19 +1411,18 @@ public void testValidateIndexingConfig() { // Although this config makes no sense, it should pass the validation phase StarTreeIndexConfig starTreeIndexConfig = - new StarTreeIndexConfig(Arrays.asList("myCol"), Arrays.asList("myCol"), Arrays.asList("SUM__myCol"), null, 1); + new StarTreeIndexConfig(List.of("myCol"), List.of("myCol"), List.of("SUM__myCol"), null, 1); tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) - .setStarTreeIndexConfigs(Arrays.asList(starTreeIndexConfig)).build(); + .setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); try { TableConfigUtils.validate(tableConfig, schema); } catch (Exception e) { Assert.fail("Should not fail for valid StarTreeIndex config column name"); } - starTreeIndexConfig = - new StarTreeIndexConfig(Arrays.asList("myCol2"), Arrays.asList("myCol"), Arrays.asList("SUM__myCol"), null, 1); + starTreeIndexConfig = new StarTreeIndexConfig(List.of("myCol2"), List.of("myCol"), List.of("SUM__myCol"), null, 1); tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) - .setStarTreeIndexConfigs(Arrays.asList(starTreeIndexConfig)).build(); + .setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); try { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail for invalid StarTreeIndex config column name in dimension split order"); @@ -1431,10 +1430,9 @@ public void testValidateIndexingConfig() { // expected } - starTreeIndexConfig = - new StarTreeIndexConfig(Arrays.asList("myCol"), Arrays.asList("myCol2"), Arrays.asList("SUM__myCol"), null, 1); + starTreeIndexConfig = new StarTreeIndexConfig(List.of("myCol"), List.of("myCol2"), List.of("SUM__myCol"), null, 1); tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) - .setStarTreeIndexConfigs(Arrays.asList(starTreeIndexConfig)).build(); + .setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); try { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail for invalid StarTreeIndex config column name in skip star node for dimension"); @@ -1442,10 +1440,9 @@ public void testValidateIndexingConfig() { // expected } - starTreeIndexConfig = - new StarTreeIndexConfig(Arrays.asList("myCol"), Arrays.asList("myCol"), Arrays.asList("SUM__myCol2"), null, 1); + starTreeIndexConfig = new StarTreeIndexConfig(List.of("myCol"), List.of("myCol"), List.of("SUM__myCol2"), null, 1); tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) - .setStarTreeIndexConfigs(Arrays.asList(starTreeIndexConfig)).build(); + .setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); try { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail for invalid StarTreeIndex config column name in function column pair"); @@ -1453,21 +1450,33 @@ public void testValidateIndexingConfig() { // expected } - starTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("myCol"), null, null, - Arrays.asList(new StarTreeAggregationConfig("myCol2", "SUM", CompressionCodec.LZ4)), 1); + starTreeIndexConfig = + new StarTreeIndexConfig(List.of("myCol"), null, null, List.of(new StarTreeAggregationConfig("myCol2", "SUM")), + 1); tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) - .setStarTreeIndexConfigs(Arrays.asList(starTreeIndexConfig)).build(); + .setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); try { TableConfigUtils.validate(tableConfig, schema); - Assert.fail("Should fail for invalid StarTreeIndex config column name in function column pair"); + Assert.fail("Should fail for invalid StarTreeIndex config column name in aggregation config"); } catch (Exception e) { // expected } - starTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("multiValCol"), Arrays.asList("multiValCol"), - Arrays.asList("SUM__multiValCol"), null, 1); + starTreeIndexConfig = new StarTreeIndexConfig(List.of("myCol"), null, List.of("SUM__myCol"), + List.of(new StarTreeAggregationConfig("myCol", "SUM")), 1); + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) + .setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); + try { + TableConfigUtils.validate(tableConfig, schema); + Assert.fail("Should fail for invalid StarTreeIndex config with both function column pair and aggregation config"); + } catch (Exception e) { + // expected + } + + starTreeIndexConfig = + new StarTreeIndexConfig(List.of("multiValCol"), List.of("multiValCol"), List.of("SUM__multiValCol"), null, 1); tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) - .setStarTreeIndexConfigs(Arrays.asList(starTreeIndexConfig)).build(); + .setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); try { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail for multi-value column name in StarTreeIndex config"); diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/AggregationSpec.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/AggregationSpec.java index f5b7770061e2..21f5a6c45b75 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/AggregationSpec.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/AggregationSpec.java @@ -18,22 +18,60 @@ */ package org.apache.pinot.segment.spi.index.startree; +import java.util.Objects; +import javax.annotation.Nullable; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; -import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.index.ForwardIndexConfig; +import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec; +import org.apache.pinot.spi.config.table.StarTreeAggregationConfig; public class AggregationSpec { - public static final AggregationSpec DEFAULT = new AggregationSpec(ChunkCompressionType.PASS_THROUGH); + public static final CompressionCodec DEFAULT_COMPRESSION_CODEC = CompressionCodec.PASS_THROUGH; + public static final AggregationSpec DEFAULT = new AggregationSpec(null, null, null, null, null); - private final ChunkCompressionType _compressionType; + private final CompressionCodec _compressionCodec; + private final boolean _deriveNumDocsPerChunk; + private final int _indexVersion; + private final int _targetMaxChunkSizeBytes; + private final int _targetDocsPerChunk; - public AggregationSpec(ChunkCompressionType compressionType) { - _compressionType = compressionType; + public AggregationSpec(StarTreeAggregationConfig aggregationConfig) { + this(aggregationConfig.getCompressionCodec(), aggregationConfig.getDeriveNumDocsPerChunk(), + aggregationConfig.getIndexVersion(), aggregationConfig.getTargetMaxChunkSizeBytes(), + aggregationConfig.getTargetDocsPerChunk()); } - public ChunkCompressionType getCompressionType() { - return _compressionType; + public AggregationSpec(@Nullable CompressionCodec compressionCodec, @Nullable Boolean deriveNumDocsPerChunk, + @Nullable Integer indexVersion, @Nullable Integer targetMaxChunkSizeBytes, @Nullable Integer targetDocsPerChunk) { + _indexVersion = indexVersion != null ? indexVersion : ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION; + _compressionCodec = compressionCodec != null ? compressionCodec : DEFAULT_COMPRESSION_CODEC; + _deriveNumDocsPerChunk = deriveNumDocsPerChunk != null ? deriveNumDocsPerChunk : false; + _targetMaxChunkSizeBytes = targetMaxChunkSizeBytes != null ? targetMaxChunkSizeBytes + : ForwardIndexConfig.DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES; + _targetDocsPerChunk = + targetDocsPerChunk != null ? targetDocsPerChunk : ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK; + } + + public CompressionCodec getCompressionCodec() { + return _compressionCodec; + } + + public boolean isDeriveNumDocsPerChunk() { + return _deriveNumDocsPerChunk; + } + + public int getIndexVersion() { + return _indexVersion; + } + + public int getTargetMaxChunkSizeBytes() { + return _targetMaxChunkSizeBytes; + } + + public int getTargetDocsPerChunk() { + return _targetDocsPerChunk; } @Override @@ -45,17 +83,27 @@ public boolean equals(Object o) { return false; } AggregationSpec that = (AggregationSpec) o; - return _compressionType == that._compressionType; + return _deriveNumDocsPerChunk == that._deriveNumDocsPerChunk && _indexVersion == that._indexVersion + && _targetMaxChunkSizeBytes == that._targetMaxChunkSizeBytes && _targetDocsPerChunk == that._targetDocsPerChunk + && _compressionCodec == that._compressionCodec; } @Override public int hashCode() { - return _compressionType.hashCode(); + return Objects.hash(_compressionCodec, _deriveNumDocsPerChunk, _indexVersion, _targetMaxChunkSizeBytes, + _targetDocsPerChunk); } @Override public String toString() { - return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("compressionType", _compressionType) + //@formatter:off + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("compressionCodec", _compressionCodec) + .append("deriveNumDocsPerChunk", _deriveNumDocsPerChunk) + .append("indexVersion", _indexVersion) + .append("targetMaxChunkSizeBytes", _targetMaxChunkSizeBytes) + .append("targetDocsPerChunk", _targetDocsPerChunk) .toString(); + //@formatter:on } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/StarTreeV2Constants.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/StarTreeV2Constants.java index 3268f8d9ece0..e9e43d1bd170 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/StarTreeV2Constants.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/StarTreeV2Constants.java @@ -53,7 +53,11 @@ public static class MetadataKey { public static final String AGGREGATION_PREFIX = "aggregation."; public static final String FUNCTION_TYPE = "function.type"; public static final String COLUMN_NAME = "column.name"; + public static final String INDEX_VERSION = "index.version"; public static final String COMPRESSION_CODEC = "compression.codec"; + public static final String DERIVE_NUM_DOCS_PER_CHUNK = "derive.num.docs.per.chunk"; + public static final String TARGET_MAX_CHUNK_SIZE_BYTES = "target.max.chunk.size.bytes"; + public static final String TARGET_DOCS_PER_CHUNK = "target.docs.per.chunk"; public static String getStarTreePrefix(int index) { return STAR_TREE_PREFIX + index; diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/StarTreeV2Metadata.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/StarTreeV2Metadata.java index 38cd7f2b9391..79437438bb31 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/StarTreeV2Metadata.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/StarTreeV2Metadata.java @@ -21,12 +21,13 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeMap; import org.apache.commons.configuration2.Configuration; import org.apache.pinot.segment.spi.AggregationFunctionType; -import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.index.startree.StarTreeV2Constants.MetadataKey; +import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec; /** @@ -56,10 +57,14 @@ public StarTreeV2Metadata(Configuration metadataProperties) { // Lookup the stored aggregation type AggregationFunctionColumnPair storedType = AggregationFunctionColumnPair.resolveToStoredType(functionColumnPair); - ChunkCompressionType compressionType = - ChunkCompressionType.valueOf(aggregationConfig.getString(MetadataKey.COMPRESSION_CODEC)); + AggregationSpec aggregationSpec = + new AggregationSpec(aggregationConfig.getEnum(MetadataKey.COMPRESSION_CODEC, CompressionCodec.class, null), + aggregationConfig.getBoolean(MetadataKey.DERIVE_NUM_DOCS_PER_CHUNK, null), + aggregationConfig.getInteger(MetadataKey.INDEX_VERSION, null), + aggregationConfig.getInteger(MetadataKey.TARGET_MAX_CHUNK_SIZE_BYTES, null), + aggregationConfig.getInteger(MetadataKey.TARGET_DOCS_PER_CHUNK, null)); // If there is already an equivalent functionColumnPair in the map for the stored type, do not load another. - _aggregationSpecs.putIfAbsent(storedType, new AggregationSpec(compressionType)); + _aggregationSpecs.putIfAbsent(storedType, aggregationSpec); } } else { // Backward compatibility with columnName format @@ -105,4 +110,34 @@ public int getMaxLeafRecords() { public Set getSkipStarNodeCreationForDimensions() { return _skipStarNodeCreationForDimensions; } + + public static void writeMetadata(Configuration metadataProperties, int totalDocs, List dimensionsSplitOrder, + TreeMap aggregationSpecs, int maxLeafRecords, + Set skipStarNodeCreationForDimensions) { + metadataProperties.setProperty(MetadataKey.TOTAL_DOCS, totalDocs); + metadataProperties.setProperty(MetadataKey.DIMENSIONS_SPLIT_ORDER, dimensionsSplitOrder); + metadataProperties.setProperty(MetadataKey.FUNCTION_COLUMN_PAIRS, aggregationSpecs.keySet()); + metadataProperties.setProperty(MetadataKey.AGGREGATION_COUNT, aggregationSpecs.size()); + int index = 0; + for (Map.Entry entry : aggregationSpecs.entrySet()) { + AggregationFunctionColumnPair functionColumnPair = entry.getKey(); + AggregationSpec aggregationSpec = entry.getValue(); + String prefix = MetadataKey.AGGREGATION_PREFIX + index + '.'; + metadataProperties.setProperty(prefix + MetadataKey.FUNCTION_TYPE, + functionColumnPair.getFunctionType().getName()); + metadataProperties.setProperty(prefix + MetadataKey.COLUMN_NAME, functionColumnPair.getColumn()); + metadataProperties.setProperty(prefix + MetadataKey.COMPRESSION_CODEC, aggregationSpec.getCompressionCodec()); + metadataProperties.setProperty(prefix + MetadataKey.DERIVE_NUM_DOCS_PER_CHUNK, + aggregationSpec.isDeriveNumDocsPerChunk()); + metadataProperties.setProperty(prefix + MetadataKey.INDEX_VERSION, aggregationSpec.getIndexVersion()); + metadataProperties.setProperty(prefix + MetadataKey.TARGET_MAX_CHUNK_SIZE_BYTES, + aggregationSpec.getTargetMaxChunkSizeBytes()); + metadataProperties.setProperty(prefix + MetadataKey.TARGET_DOCS_PER_CHUNK, + aggregationSpec.getTargetDocsPerChunk()); + index++; + } + metadataProperties.setProperty(MetadataKey.MAX_LEAF_RECORDS, maxLeafRecords); + metadataProperties.setProperty(MetadataKey.SKIP_STAR_NODE_CREATION_FOR_DIMENSIONS, + skipStarNodeCreationForDimensions); + } } diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/index/startree/StarTreeV2MetadataTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/index/startree/StarTreeV2MetadataTest.java index 270c68b8808d..26f9b74532ed 100644 --- a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/index/startree/StarTreeV2MetadataTest.java +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/index/startree/StarTreeV2MetadataTest.java @@ -21,7 +21,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.TreeMap; import org.apache.commons.configuration2.Configuration; @@ -40,8 +39,8 @@ public void testUniqueAggregationSpecs() { expected.put(AggregationFunctionColumnPair.fromColumnName("count__*"), AggregationSpec.DEFAULT); expected.put(AggregationFunctionColumnPair.fromColumnName("sum__dimX"), AggregationSpec.DEFAULT); - Configuration configuration = createConfiguration(Collections.singletonList("dimX"), null, expected); - StarTreeV2Metadata starTreeV2Metadata = new StarTreeV2Metadata(configuration); + Configuration metadataProperties = createMetadata(List.of("dimX"), expected); + StarTreeV2Metadata starTreeV2Metadata = new StarTreeV2Metadata(metadataProperties); TreeMap actual = starTreeV2Metadata.getAggregationSpecs(); assertEquals(expected, actual); } @@ -57,8 +56,8 @@ public void testDuplicateAggregationSpecs() { expected.put(thetaColumnPair, AggregationSpec.DEFAULT); expected.put(rawThetaColumnPair, AggregationSpec.DEFAULT); - Configuration configuration = createConfiguration(Collections.singletonList("dimX"), null, expected); - StarTreeV2Metadata starTreeV2Metadata = new StarTreeV2Metadata(configuration); + Configuration metadataProperties = createMetadata(List.of("dimX"), expected); + StarTreeV2Metadata starTreeV2Metadata = new StarTreeV2Metadata(metadataProperties); TreeMap actual = starTreeV2Metadata.getAggregationSpecs(); expected.remove(rawThetaColumnPair); assertEquals(expected, actual); @@ -71,8 +70,8 @@ public void testUniqueFunctionColumnPairs() { expected.add(AggregationFunctionColumnPair.fromColumnName("count__*")); expected.add(AggregationFunctionColumnPair.fromColumnName("sum__dimX")); - Configuration configuration = createConfiguration(Collections.singletonList("dimX"), expected, null); - StarTreeV2Metadata starTreeV2Metadata = new StarTreeV2Metadata(configuration); + Configuration metadataProperties = createMetadata(List.of("dimX"), expected); + StarTreeV2Metadata starTreeV2Metadata = new StarTreeV2Metadata(metadataProperties); Set actual = starTreeV2Metadata.getFunctionColumnPairs(); assertEquals(expected, actual); } @@ -88,8 +87,8 @@ public void testDuplicateFunctionColumnPairs() { expected.add(thetaColumnPair); expected.add(rawThetaColumnPair); - Configuration configuration = createConfiguration(Collections.singletonList("dimX"), expected, null); - StarTreeV2Metadata starTreeV2Metadata = new StarTreeV2Metadata(configuration); + Configuration metadataProperties = createMetadata(Collections.singletonList("dimX"), expected); + StarTreeV2Metadata starTreeV2Metadata = new StarTreeV2Metadata(metadataProperties); Set actual = starTreeV2Metadata.getFunctionColumnPairs(); expected.remove(rawThetaColumnPair); @@ -97,34 +96,22 @@ public void testDuplicateFunctionColumnPairs() { assertTrue(starTreeV2Metadata.containsFunctionColumnPair(thetaColumnPair)); } - private static Configuration createConfiguration(List dimensionsSplitOrder, - Set functionColumnPairs, + private static Configuration createMetadata(List dimensionsSplitOrder, TreeMap aggregationSpecs) { Configuration metadataProperties = new PropertiesConfiguration(); + StarTreeV2Metadata.writeMetadata(metadataProperties, 1, dimensionsSplitOrder, aggregationSpecs, 10000, Set.of()); + return metadataProperties; + } + + // This is the old star-tree metadata format + private static Configuration createMetadata(List dimensionsSplitOrder, + Set functionColumnPairs) { + Configuration metadataProperties = new PropertiesConfiguration(); metadataProperties.setProperty(StarTreeV2Constants.MetadataKey.TOTAL_DOCS, 1); metadataProperties.setProperty(StarTreeV2Constants.MetadataKey.DIMENSIONS_SPLIT_ORDER, dimensionsSplitOrder); - if (functionColumnPairs != null) { - metadataProperties.setProperty(StarTreeV2Constants.MetadataKey.FUNCTION_COLUMN_PAIRS, functionColumnPairs); - metadataProperties.setProperty(StarTreeV2Constants.MetadataKey.AGGREGATION_COUNT, 0); - } else { - metadataProperties.setProperty(StarTreeV2Constants.MetadataKey.AGGREGATION_COUNT, aggregationSpecs.size()); - int index = 0; - for (Map.Entry entry : aggregationSpecs.entrySet()) { - AggregationFunctionColumnPair functionColumnPair = entry.getKey(); - AggregationSpec aggregationSpec = entry.getValue(); - String prefix = StarTreeV2Constants.MetadataKey.AGGREGATION_PREFIX + index + '.'; - metadataProperties.setProperty(prefix + StarTreeV2Constants.MetadataKey.FUNCTION_TYPE, - functionColumnPair.getFunctionType().getName()); - metadataProperties.setProperty(prefix + StarTreeV2Constants.MetadataKey.COLUMN_NAME, - functionColumnPair.getColumn()); - metadataProperties.setProperty(prefix + StarTreeV2Constants.MetadataKey.COMPRESSION_CODEC, - aggregationSpec.getCompressionType()); - index++; - } - } + metadataProperties.setProperty(StarTreeV2Constants.MetadataKey.FUNCTION_COLUMN_PAIRS, functionColumnPairs); metadataProperties.setProperty(StarTreeV2Constants.MetadataKey.MAX_LEAF_RECORDS, 10000); - metadataProperties.setProperty(StarTreeV2Constants.MetadataKey.SKIP_STAR_NODE_CREATION_FOR_DIMENSIONS, - new HashSet<>()); + metadataProperties.setProperty(StarTreeV2Constants.MetadataKey.SKIP_STAR_NODE_CREATION_FOR_DIMENSIONS, Set.of()); return metadataProperties; } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java index 8a01646da991..9f6f6b88f8d7 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java @@ -124,17 +124,20 @@ public enum IndexType { } public enum CompressionCodec { + //@formatter:off PASS_THROUGH(true, false), SNAPPY(true, false), ZSTANDARD(true, false), LZ4(true, false), - // CLP is a special type of compression codec that isn't generally applicable to all RAW columns and has a - // special handling for log lines (see {@link CLPForwardIndexCreatorV1}) - CLP(false, false), GZIP(true, false), // For MV dictionary encoded forward index, add a second level dictionary encoding for the multi-value entries - MV_ENTRY_DICT(false, true); + MV_ENTRY_DICT(false, true), + + // CLP is a special type of compression codec that isn't generally applicable to all RAW columns and has a special + // handling for log lines (see {@link CLPForwardIndexCreatorV1}) + CLP(false, false); + //@formatter:on private final boolean _applicableToRawIndex; private final boolean _applicableToDictEncodedIndex; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/StarTreeAggregationConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/StarTreeAggregationConfig.java index 0c8ea7a3b3a9..71ea76a53963 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/StarTreeAggregationConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/StarTreeAggregationConfig.java @@ -19,24 +19,46 @@ package org.apache.pinot.spi.config.table; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import javax.annotation.Nullable; import org.apache.pinot.spi.config.BaseJsonConfig; import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec; +import org.apache.pinot.spi.utils.DataSizeUtils; public class StarTreeAggregationConfig extends BaseJsonConfig { private final String _columnName; private final String _aggregationFunction; private final CompressionCodec _compressionCodec; + private final Boolean _deriveNumDocsPerChunk; + private final Integer _indexVersion; + private final String _targetMaxChunkSize; + private final Integer _targetMaxChunkSizeBytes; + private final Integer _targetDocsPerChunk; + + @VisibleForTesting + public StarTreeAggregationConfig(String columnName, String aggregationFunction) { + this(columnName, aggregationFunction, null, null, null, null, null); + } @JsonCreator public StarTreeAggregationConfig(@JsonProperty(value = "columnName", required = true) String columnName, @JsonProperty(value = "aggregationFunction", required = true) String aggregationFunction, - @JsonProperty(value = "compressionCodec") @Nullable CompressionCodec compressionCodec) { + @JsonProperty(value = "compressionCodec") @Nullable CompressionCodec compressionCodec, + @JsonProperty(value = "deriveNumDocsPerChunk") @Nullable Boolean deriveNumDocsPerChunk, + @JsonProperty(value = "indexVersion") @Nullable Integer indexVersion, + @JsonProperty(value = "targetMaxChunkSize") @Nullable String targetMaxChunkSize, + @JsonProperty(value = "targetDocsPerChunk") @Nullable Integer targetDocsPerChunk) { _columnName = columnName; _aggregationFunction = aggregationFunction; - _compressionCodec = compressionCodec != null ? compressionCodec : CompressionCodec.PASS_THROUGH; + _compressionCodec = compressionCodec; + _deriveNumDocsPerChunk = deriveNumDocsPerChunk; + _indexVersion = indexVersion; + _targetMaxChunkSize = targetMaxChunkSize; + _targetMaxChunkSizeBytes = targetMaxChunkSize != null ? (int) DataSizeUtils.toBytes(targetMaxChunkSize) : null; + _targetDocsPerChunk = targetDocsPerChunk; } public String getColumnName() { @@ -47,7 +69,34 @@ public String getAggregationFunction() { return _aggregationFunction; } + @Nullable public CompressionCodec getCompressionCodec() { return _compressionCodec; } + + @Nullable + public Boolean getDeriveNumDocsPerChunk() { + return _deriveNumDocsPerChunk; + } + + @Nullable + public Integer getIndexVersion() { + return _indexVersion; + } + + @Nullable + public String getTargetMaxChunkSize() { + return _targetMaxChunkSize; + } + + @JsonIgnore + @Nullable + public Integer getTargetMaxChunkSizeBytes() { + return _targetMaxChunkSizeBytes; + } + + @Nullable + public Integer getTargetDocsPerChunk() { + return _targetDocsPerChunk; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/StarTreeIndexConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/StarTreeIndexConfig.java index b4f1ea230c7c..5b088ae13278 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/StarTreeIndexConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/StarTreeIndexConfig.java @@ -34,7 +34,7 @@ public class StarTreeIndexConfig extends BaseJsonConfig { private final List _skipStarNodeCreationForDimensions; // Function column pairs with delimiter "__", e.g. SUM__col1, MAX__col2, COUNT__* private final List _functionColumnPairs; - // Function column pairs config, currently only handling compression. + // Function column pairs config private final List _aggregationConfigs; // The upper bound of records to be scanned at the leaf node private final int _maxLeafRecords;