Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix configuration handling during MergeRollupTask execution #14856

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,28 +137,6 @@ public static Map<String, AggregationFunctionType> getAggregationTypes(Map<Strin
return aggregationTypes;
}

/**
* Returns a map from column name to the aggregation function parameters associated with it based on the task config.
*/
public static Map<String, Map<String, String>> getAggregationFunctionParameters(Map<String, String> taskConfig) {
Map<String, Map<String, String>> aggregationFunctionParameters = new HashMap<>();
String prefix = MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX;

for (Map.Entry<String, String> entry : taskConfig.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (key.startsWith(prefix)) {
String[] parts = key.substring(prefix.length()).split("\\.", 2);
if (parts.length == 2) {
String metricColumn = parts[0];
String paramName = parts[1];
aggregationFunctionParameters.computeIfAbsent(metricColumn, k -> new HashMap<>()).put(paramName, value);
}
}
}
return aggregationFunctionParameters;
}

swaminathanmanish marked this conversation as resolved.
Show resolved Hide resolved
/**
* Returns the segment config based on the task config.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig,

// Aggregation function parameters
segmentProcessorConfigBuilder.setAggregationFunctionParameters(
MergeTaskUtils.getAggregationFunctionParameters(configs));
MergeRollupTaskUtils.getAggregationFunctionParameters(configs));

// Segment config
segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.plugin.minion.tasks.mergerollup;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -78,16 +79,59 @@ public static Map<String, Map<String, String>> getLevelToConfigMap(Map<String, S
return levelToConfigMap;
}

/**
* Returns a lookup key composed of the current merge level / key combination
* @param key the key of the value within the task configuration.
* @param taskConfig the current merge rollup task configuration used for sourcing the merge level.
* @return composite lookup key if the merge level is configured. Otherwise, return original key.
*/
public static String buildMergeLevelKeyPrefix(String key, Map<String, String> taskConfig) {
swaminathanmanish marked this conversation as resolved.
Show resolved Hide resolved
String mergeLevel = taskConfig.get(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY);
if (mergeLevel == null) {
return key;
} else {
return mergeLevel + "." + key;
}
}

/**
* Extracts an array of dimensions to reduce/erase from the task config.
* <p>The config for the dimensions to erase should be a comma-separated string value.
*/
public static Set<String> getDimensionsToErase(Map<String, String> taskConfig) {
if (taskConfig == null || taskConfig.get(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY) == null) {
if (taskConfig == null) {
return new HashSet<>();
}
return Arrays.stream(taskConfig.get(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY).split(","))
String key = buildMergeLevelKeyPrefix(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY, taskConfig);
String dimensionsToErase = taskConfig.get(key);

if (dimensionsToErase == null) {
return new HashSet<>();
}
return Arrays.stream(dimensionsToErase.split(","))
.map(String::trim)
.collect(Collectors.toSet());
}

/**
* Returns a map from column name to the aggregation function parameters associated with it based on the task config.
*/
public static Map<String, Map<String, String>> getAggregationFunctionParameters(Map<String, String> taskConfig) {
Map<String, Map<String, String>> aggregationFunctionParameters = new HashMap<>();
String prefix = buildMergeLevelKeyPrefix(MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX, taskConfig);
swaminathanmanish marked this conversation as resolved.
Show resolved Hide resolved

for (Map.Entry<String, String> entry : taskConfig.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (key.startsWith(prefix)) {
String[] parts = key.substring(prefix.length()).split("\\.", 2);
if (parts.length == 2) {
String metricColumn = parts[0];
String paramName = parts[1];
aggregationFunctionParameters.computeIfAbsent(metricColumn, k -> new HashMap<>()).put(paramName, value);
}
}
}
return aggregationFunctionParameters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
import org.apache.pinot.plugin.minion.tasks.mergerollup.MergeRollupTaskUtils;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
Expand Down Expand Up @@ -151,7 +152,7 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig,

// Aggregation function parameters
segmentProcessorConfigBuilder.setAggregationFunctionParameters(
MergeTaskUtils.getAggregationFunctionParameters(configs));
MergeRollupTaskUtils.getAggregationFunctionParameters(configs));

// Segment config
segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,23 +172,6 @@ public void testGetAggregationTypes() {
}
}

@Test
public void testGetAggregationFunctionParameters() {
Map<String, String> taskConfig = new HashMap<>();
taskConfig.put(MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX + "metricColumnA.param1", "value1");
taskConfig.put(MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX + "metricColumnA.param2", "value2");
taskConfig.put(MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX + "metricColumnB.param1", "value3");
taskConfig.put("otherPrefix.metricColumnC.param1", "value1");
taskConfig.put("aggregationFunction.metricColumnD.param2", "value2");
Map<String, Map<String, String>> result = MergeTaskUtils.getAggregationFunctionParameters(taskConfig);
assertEquals(result.size(), 2);
assertTrue(result.containsKey("metricColumnA"));
assertTrue(result.containsKey("metricColumnB"));
assertEquals(result.get("metricColumnA").get("param1"), "value1");
assertEquals(result.get("metricColumnA").get("param2"), "value2");
assertEquals(result.get("metricColumnB").get("param1"), "value3");
}

@Test
public void testGetSegmentConfig() {
Map<String, String> taskConfig = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void testDimensionErasure()
configs.put(MinionConstants.TABLE_NAME_KEY, "testTable_OFFLINE");
configs.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, "daily");
configs.put(MinionConstants.MergeTask.MERGE_TYPE_KEY, "rollup");
configs.put(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY, D1);
configs.put("daily." + MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY, D1);

PinotTaskConfig pinotTaskConfig = new PinotTaskConfig(MinionConstants.MergeRollupTask.TASK_TYPE, configs);
List<SegmentConversionResult> conversionResults =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ public void testGetLevelToConfigMap() {
assertEquals(monthlyConfig.get(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY), "a,b,c,d");
}

@Test
public void testBuildMergeLevelKeyPrefix() {
Map<String, String> taskConfig = new HashMap<>();
taskConfig.put("key", "value");
String key1 = MergeRollupTaskUtils.buildMergeLevelKeyPrefix("key", taskConfig);
assertEquals(key1, "key", "Expected key to remain unchanged.");

taskConfig.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, "hourly");
String key2 = MergeRollupTaskUtils.buildMergeLevelKeyPrefix("key", taskConfig);
assertEquals(key2, "hourly.key", "Expected merge level prepended to key.");
}

@Test
public void testEraseDimensionValuesAbsent() {
Set<String> result1 = MergeRollupTaskUtils.getDimensionsToErase(null);
Expand All @@ -81,7 +93,10 @@ public void testEraseDimensionValuesAbsent() {
@Test
public void testEraseSingleDimensionValue() {
Map<String, String> taskConfig = new HashMap<>();
taskConfig.put(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY, "dimension1");
String mergeLevel = "daily";
String key = mergeLevel + "." + MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY;
taskConfig.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel);
taskConfig.put(key, "dimension1");
Set<String> result = MergeRollupTaskUtils.getDimensionsToErase(taskConfig);
assertEquals(result.size(), 1, "Expected one dimension in the result set");
assertTrue(result.contains("dimension1"), "Expected set to contain 'dimension1'");
Expand All @@ -90,8 +105,10 @@ public void testEraseSingleDimensionValue() {
@Test
public void testEraseMultipleDimensionValues() {
Map<String, String> taskConfig = new HashMap<>();
taskConfig.put(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY,
" dimension1 , dimension2 , dimension3 ");
String mergeLevel = "hourly";
String key = mergeLevel + "." + MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY;
taskConfig.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel);
taskConfig.put(key, " dimension1 , dimension2 , dimension3 ");
Set<String> result = MergeRollupTaskUtils.getDimensionsToErase(taskConfig);
assertEquals(result.size(), 3, "Expected three dimensions in the result set with whitespace trimmed");
assertTrue(result.contains("dimension1"), "Expected set to contain 'dimension1'");
Expand All @@ -100,7 +117,27 @@ public void testEraseMultipleDimensionValues() {
}

@Test
public void testAggregationFunctionParameters() {
public void testGetAggregationFunctionParameters() {
Map<String, String> taskConfig = new HashMap<>();
String mergeLevel = "hourly";
String prefix = mergeLevel + "." + MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX;
taskConfig.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel);
taskConfig.put(prefix + "metricColumnA.param1", "value1");
taskConfig.put(prefix + "metricColumnA.param2", "value2");
taskConfig.put(prefix + "metricColumnB.param1", "value3");
taskConfig.put("otherPrefix.metricColumnC.param1", "value1");
taskConfig.put("aggregationFunction.metricColumnD.param2", "value2");
Map<String, Map<String, String>> result = MergeRollupTaskUtils.getAggregationFunctionParameters(taskConfig);
assertEquals(result.size(), 2);
assertTrue(result.containsKey("metricColumnA"));
assertTrue(result.containsKey("metricColumnB"));
assertEquals(result.get("metricColumnA").get("param1"), "value1");
assertEquals(result.get("metricColumnA").get("param2"), "value2");
assertEquals(result.get("metricColumnB").get("param1"), "value3");
}

@Test
public void testLevelToAggregationFunctionParameters() {
Map<String, String> taskConfig = new HashMap<>();
taskConfig.put("hourly.aggregationFunctionParameters.metricColumnA.nominalEntries", "16384");
taskConfig.put("hourly.aggregationFunctionParameters.metricColumnB.nominalEntries", "8192");
Expand Down
Loading