Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix configuration handling during MergeRollupTask execution #14856

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,28 +137,6 @@ public static Map<String, AggregationFunctionType> getAggregationTypes(Map<Strin
return aggregationTypes;
}

/**
* Returns a map from column name to the aggregation function parameters associated with it based on the task config.
*/
public static Map<String, Map<String, String>> getAggregationFunctionParameters(Map<String, String> taskConfig) {
Map<String, Map<String, String>> aggregationFunctionParameters = new HashMap<>();
String prefix = MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX;

for (Map.Entry<String, String> entry : taskConfig.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (key.startsWith(prefix)) {
String[] parts = key.substring(prefix.length()).split("\\.", 2);
if (parts.length == 2) {
String metricColumn = parts[0];
String paramName = parts[1];
aggregationFunctionParameters.computeIfAbsent(metricColumn, k -> new HashMap<>()).put(paramName, value);
}
}
}
return aggregationFunctionParameters;
}

swaminathanmanish marked this conversation as resolved.
Show resolved Hide resolved
/**
* Returns the segment config based on the task config.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig,

// Aggregation function parameters
segmentProcessorConfigBuilder.setAggregationFunctionParameters(
MergeTaskUtils.getAggregationFunctionParameters(configs));
MergeRollupTaskUtils.getAggregationFunctionParameters(configs));

// Segment config
segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pinot.plugin.minion.tasks.mergerollup;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -50,11 +52,13 @@
import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
import org.apache.pinot.plugin.minion.tasks.mergerollup.segmentgroupmananger.MergeRollupTaskSegmentGroupManagerProvider;
import org.apache.pinot.segment.spi.Constants;
import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.TimeUtils;
Expand Down Expand Up @@ -486,6 +490,58 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
return pinotTaskConfigs;
}

@Override
public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map<String, String> taskConfigs) {
swaminathanmanish marked this conversation as resolved.
Show resolved Hide resolved
Set<String> columnNames = schema.getColumnNames();
// check no mis-configured columns when erasing dimensions
Set<String> dimensionsToErase = MergeRollupTaskUtils.getDimensionsToErase(taskConfigs);
for (String dimension : dimensionsToErase) {
Preconditions.checkState(columnNames.contains(dimension),
String.format("Column dimension to erase \"%s\" not found in schema!", dimension));
}
// check no mis-configured aggregation function parameters
Set<String> allowedFunctionParameterNames = ImmutableSet.of(Constants.CPCSKETCH_LGK_KEY.toLowerCase(),
Constants.THETA_TUPLE_SKETCH_SAMPLING_PROBABILITY.toLowerCase(),
Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES.toLowerCase());
Map<String, Map<String, String>> aggregationFunctionParameters =
MergeRollupTaskUtils.getAggregationFunctionParameters(taskConfigs);
for (String fieldName : aggregationFunctionParameters.keySet()) {
// check that function parameter field name exists
Preconditions.checkState(columnNames.contains(fieldName),
String.format("Metric column \"%s\" for aggregation function parameter not found in schema!", fieldName));
Map<String, String> functionParameters = aggregationFunctionParameters.get(fieldName);
for (String functionParameterName : functionParameters.keySet()) {
// check that function parameter name is valid
Preconditions.checkState(allowedFunctionParameterNames.contains(functionParameterName.toLowerCase()),
"Aggregation function parameter name must be one of [lgK, samplingProbability, nominalEntries]!");
// check that function parameter value is valid for nominal entries
if (functionParameterName.equalsIgnoreCase(Constants.CPCSKETCH_LGK_KEY)
|| functionParameterName.equalsIgnoreCase(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES)) {
String value = functionParameters.get(functionParameterName);
String err = String.format("Aggregation function parameter \"%s\" on column \"%s\" has invalid value: %s",
functionParameterName, fieldName, value);
try {
Preconditions.checkState(Integer.parseInt(value) > 0, err);
} catch (NumberFormatException e) {
throw new IllegalStateException(err);
}
}
// check that function parameter value is valid for sampling probability
if (functionParameterName.equalsIgnoreCase(Constants.THETA_TUPLE_SKETCH_SAMPLING_PROBABILITY)) {
String value = functionParameters.get(functionParameterName);
String err = String.format("Aggregation function parameter \"%s\" on column \"%s\" has invalid value: %s",
functionParameterName, fieldName, value);
try {
float p = Float.parseFloat(value);
Preconditions.checkState(p >= 0.0f && p <= 1.0f, err);
} catch (NumberFormatException e) {
throw new IllegalStateException(err);
}
}
}
}
}

@VisibleForTesting
static List<SegmentZKMetadata> filterSegmentsBasedOnStatus(TableType tableType, List<SegmentZKMetadata> allSegments) {
if (tableType == TableType.REALTIME) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.plugin.minion.tasks.mergerollup;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -78,16 +79,59 @@ public static Map<String, Map<String, String>> getLevelToConfigMap(Map<String, S
return levelToConfigMap;
}

/**
* Returns a lookup key composed of the current merge level / key combination
* @param key the key of the value within the task configuration.
* @param taskConfig the current merge rollup task configuration used for sourcing the merge level.
* @return composite lookup key if the merge level is configured. Otherwise, return original key.
*/
public static String buildMergeLevelKeyPrefix(String key, Map<String, String> taskConfig) {
swaminathanmanish marked this conversation as resolved.
Show resolved Hide resolved
String mergeLevel = taskConfig.get(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY);
if (mergeLevel == null) {
return key;
} else {
return mergeLevel + "." + key;
}
}

/**
* Extracts an array of dimensions to reduce/erase from the task config.
* <p>The config for the dimensions to erase should be a comma-separated string value.
*/
public static Set<String> getDimensionsToErase(Map<String, String> taskConfig) {
if (taskConfig == null || taskConfig.get(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY) == null) {
if (taskConfig == null) {
return new HashSet<>();
}
return Arrays.stream(taskConfig.get(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY).split(","))
String key = buildMergeLevelKeyPrefix(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY, taskConfig);
String dimensionsToErase = taskConfig.get(key);

if (dimensionsToErase == null) {
return new HashSet<>();
}
return Arrays.stream(dimensionsToErase.split(","))
.map(String::trim)
.collect(Collectors.toSet());
}

/**
* Returns a map from column name to the aggregation function parameters associated with it based on the task config.
*/
public static Map<String, Map<String, String>> getAggregationFunctionParameters(Map<String, String> taskConfig) {
Map<String, Map<String, String>> aggregationFunctionParameters = new HashMap<>();
String prefix = buildMergeLevelKeyPrefix(MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX, taskConfig);
swaminathanmanish marked this conversation as resolved.
Show resolved Hide resolved

for (Map.Entry<String, String> entry : taskConfig.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (key.startsWith(prefix)) {
String[] parts = key.substring(prefix.length()).split("\\.", 2);
if (parts.length == 2) {
String metricColumn = parts[0];
String paramName = parts[1];
aggregationFunctionParameters.computeIfAbsent(metricColumn, k -> new HashMap<>()).put(paramName, value);
}
}
}
return aggregationFunctionParameters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
import org.apache.pinot.plugin.minion.tasks.mergerollup.MergeRollupTaskUtils;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
Expand Down Expand Up @@ -151,7 +152,7 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig,

// Aggregation function parameters
segmentProcessorConfigBuilder.setAggregationFunctionParameters(
MergeTaskUtils.getAggregationFunctionParameters(configs));
MergeRollupTaskUtils.getAggregationFunctionParameters(configs));

// Segment config
segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,23 +172,6 @@ public void testGetAggregationTypes() {
}
}

@Test
public void testGetAggregationFunctionParameters() {
Map<String, String> taskConfig = new HashMap<>();
taskConfig.put(MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX + "metricColumnA.param1", "value1");
taskConfig.put(MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX + "metricColumnA.param2", "value2");
taskConfig.put(MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX + "metricColumnB.param1", "value3");
taskConfig.put("otherPrefix.metricColumnC.param1", "value1");
taskConfig.put("aggregationFunction.metricColumnD.param2", "value2");
Map<String, Map<String, String>> result = MergeTaskUtils.getAggregationFunctionParameters(taskConfig);
assertEquals(result.size(), 2);
assertTrue(result.containsKey("metricColumnA"));
assertTrue(result.containsKey("metricColumnB"));
assertEquals(result.get("metricColumnA").get("param1"), "value1");
assertEquals(result.get("metricColumnA").get("param2"), "value2");
assertEquals(result.get("metricColumnB").get("param1"), "value3");
}

@Test
public void testGetSegmentConfig() {
Map<String, String> taskConfig = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void testDimensionErasure()
configs.put(MinionConstants.TABLE_NAME_KEY, "testTable_OFFLINE");
configs.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, "daily");
configs.put(MinionConstants.MergeTask.MERGE_TYPE_KEY, "rollup");
configs.put(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY, D1);
configs.put("daily." + MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY, D1);

PinotTaskConfig pinotTaskConfig = new PinotTaskConfig(MinionConstants.MergeRollupTask.TASK_TYPE, configs);
List<SegmentConversionResult> conversionResults =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.Test;
Expand All @@ -61,10 +65,7 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.*;


/**
Expand Down Expand Up @@ -113,6 +114,127 @@ public void testValidateIfMergeRollupCanBeEnabledOrNot() {
assertFalse(MergeRollupTaskGenerator.validate(tableConfig, MinionConstants.MergeRollupTask.TASK_TYPE));
}

@Test
public void testValidMergeLevelTaskConfig() {
MergeRollupTaskGenerator taskGenerator = new MergeRollupTaskGenerator();
Schema schema = new Schema();
schema.addField(new DimensionFieldSpec("a", FieldSpec.DataType.STRING, false));
schema.addField(new DimensionFieldSpec("b", FieldSpec.DataType.STRING, false));
schema.addField(new MetricFieldSpec("c", FieldSpec.DataType.BYTES));

String mergeLevel = "hourly";
String prefix = mergeLevel + "." + MinionConstants.MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX;

Map<String, String> validConfig = new HashMap<>();
validConfig.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel);
validConfig.put(mergeLevel + ".eraseDimensionValues", "a,b");
validConfig.put(prefix + "c.nominalEntries", "8092");
validConfig.put(prefix + "c.samplingProbability", "0.9");
TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setTaskConfig(new TableTaskConfig(ImmutableMap.of(MinionConstants.MergeRollupTask.TASK_TYPE, validConfig)))
.build();
taskGenerator.validateTaskConfigs(offlineTableConfig, schema, validConfig);
}

@Test
public void testInvalidDimensionsToErase() {
MergeRollupTaskGenerator taskGenerator = new MergeRollupTaskGenerator();
Schema schema = new Schema();
schema.addField(new DimensionFieldSpec("a", FieldSpec.DataType.STRING, false));

String mergeLevel = "hourly";

Map<String, String> invalidConfig = new HashMap<>();
invalidConfig.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel);
invalidConfig.put(mergeLevel + ".eraseDimensionValues", "b");
TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setTaskConfig(new TableTaskConfig(ImmutableMap.of(MinionConstants.MergeRollupTask.TASK_TYPE, invalidConfig)))
.build();
assertThrows(IllegalStateException.class, () -> {
taskGenerator.validateTaskConfigs(offlineTableConfig, schema, invalidConfig);
});
}

@Test
public void testInvalidAggregationFunctionFieldName() {
MergeRollupTaskGenerator taskGenerator = new MergeRollupTaskGenerator();
Schema schema = new Schema();
schema.addField(new MetricFieldSpec("a", FieldSpec.DataType.BYTES));

String mergeLevel = "hourly";
String prefix = mergeLevel + "." + MinionConstants.MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX;

Map<String, String> invalidConfig = new HashMap<>();
invalidConfig.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel);
invalidConfig.put(prefix + "b.nominalEntries", "8092");
TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setTaskConfig(new TableTaskConfig(ImmutableMap.of(MinionConstants.MergeRollupTask.TASK_TYPE, invalidConfig)))
.build();
assertThrows(IllegalStateException.class, () -> {
taskGenerator.validateTaskConfigs(offlineTableConfig, schema, invalidConfig);
});
}

@Test
public void testInvalidSamplingProbability() {
MergeRollupTaskGenerator taskGenerator = new MergeRollupTaskGenerator();
Schema schema = new Schema();
schema.addField(new MetricFieldSpec("a", FieldSpec.DataType.BYTES));

String mergeLevel = "hourly";
String prefix = mergeLevel + "." + MinionConstants.MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX;

Map<String, String> invalidConfig = new HashMap<>();
invalidConfig.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel);
invalidConfig.put(prefix + "a.samplingProbability", "-1.01");
TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setTaskConfig(new TableTaskConfig(ImmutableMap.of(MinionConstants.MergeRollupTask.TASK_TYPE, invalidConfig)))
.build();
assertThrows(IllegalStateException.class, () -> {
taskGenerator.validateTaskConfigs(offlineTableConfig, schema, invalidConfig);
});
}

@Test
public void testInvalidNominalEntries() {
MergeRollupTaskGenerator taskGenerator = new MergeRollupTaskGenerator();
Schema schema = new Schema();
schema.addField(new MetricFieldSpec("a", FieldSpec.DataType.BYTES));

String mergeLevel = "hourly";
String prefix = mergeLevel + "." + MinionConstants.MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX;

Map<String, String> invalidConfig = new HashMap<>();
invalidConfig.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel);
invalidConfig.put(prefix + "a.nominalEntries", "0");
TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setTaskConfig(new TableTaskConfig(ImmutableMap.of(MinionConstants.MergeRollupTask.TASK_TYPE, invalidConfig)))
.build();
assertThrows(IllegalStateException.class, () -> {
taskGenerator.validateTaskConfigs(offlineTableConfig, schema, invalidConfig);
});
}

@Test
public void testInvalidLgK() {
MergeRollupTaskGenerator taskGenerator = new MergeRollupTaskGenerator();
Schema schema = new Schema();
schema.addField(new MetricFieldSpec("a", FieldSpec.DataType.BYTES));

String mergeLevel = "hourly";
String prefix = mergeLevel + "." + MinionConstants.MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX;

Map<String, String> invalidConfig = new HashMap<>();
invalidConfig.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel);
invalidConfig.put(prefix + "a.lgK", "0");
TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setTaskConfig(new TableTaskConfig(ImmutableMap.of(MinionConstants.MergeRollupTask.TASK_TYPE, invalidConfig)))
.build();
assertThrows(IllegalStateException.class, () -> {
taskGenerator.validateTaskConfigs(offlineTableConfig, schema, invalidConfig);
});
}

/**
* Tests for some config checks
*/
Expand Down
Loading
Loading