Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ public static PinotDataType getArgumentType(Class<?> clazz) {
if (Collection.class.isAssignableFrom(clazz)) {
return PinotDataType.COLLECTION;
}
if (Map.class.isAssignableFrom(clazz)) {
return PinotDataType.MAP;
}
return ARGUMENT_TYPE_MAP.get(clazz);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pinot.spi.utils.BooleanUtils;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.MapUtils;
import org.apache.pinot.spi.utils.TimestampUtils;


Expand Down Expand Up @@ -827,6 +828,8 @@ public Object convert(Object value, PinotDataType sourceType) {
} catch (Exception e) {
throw new RuntimeException("Unable to convert String to Map. Input value: " + value, e);
}
case BYTES:
return MapUtils.deserializeMap((byte[]) value);
case OBJECT:
case MAP:
if (value instanceof Map) {
Expand All @@ -840,6 +843,16 @@ public Object convert(Object value, PinotDataType sourceType) {
sourceType, value.getClass()));
}
}

@SuppressWarnings("unchecked")
@Override
public byte[] toBytes(Object value) {
if (!(value instanceof Map)) {
throw new UnsupportedOperationException("Cannot convert non-Map value to BYTES for MAP type: "
+ (value == null ? "null" : value.getClass()));
}
return MapUtils.serializeMap((Map<String, Object>) value);
}
},

BYTE_ARRAY {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.pinot.segment.local.segment.creator.impl.stats;

import com.fasterxml.jackson.core.JsonProcessingException;
import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Map;
import org.apache.pinot.common.utils.PinotDataType;
Expand All @@ -28,20 +30,22 @@
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.ComplexFieldSpec;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.MapUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Extension of {@link AbstractColumnStatisticsCollector} for Map column type.
*
* The Map column type is different than other columns in that it is essentially recursive. It contains keys
* The Map column type is different from other columns in that it is essentially recursive. It contains keys
* and those keys are analogous to columns and, as such, have Key level statistics. So, this class keeps track of
* Map column level statistics _and_ Key level statistics. The Key Level statistics can then be used during
* Map column level statistics _and_ Key level statistics. The Key Level statistics can then be used during
* the creation of the Immutable Segment to make decisions about how keys will be stored or what Map data structure
* to use.
*
Expand All @@ -51,20 +55,19 @@
* heterogeneous value types for a key are encountered will construct the Map statistics it can be raised as a fault.
*/
public class MapColumnPreIndexStatsCollector extends AbstractColumnStatisticsCollector {
private static final Logger LOGGER = LoggerFactory.getLogger(MapColumnPreIndexStatsCollector.class);
private final Object2ObjectOpenHashMap<String, AbstractColumnStatisticsCollector> _keyStats =
new Object2ObjectOpenHashMap<>(INITIAL_HASH_SET_SIZE);
private final Map<String, Integer> _keyFrequencies = new Object2ObjectOpenHashMap<>(INITIAL_HASH_SET_SIZE);
private String[] _sortedKeys;
private int _minLength = Integer.MAX_VALUE;
private int _maxLength = 0;
private boolean _sealed = false;
private ComplexFieldSpec _colFieldSpec;
private boolean _createNoDictCollectorsForKeys = false;

public MapColumnPreIndexStatsCollector(String column, StatsCollectorConfig statsCollectorConfig) {
super(column, statsCollectorConfig);
_sorted = false;
_colFieldSpec = (ComplexFieldSpec) statsCollectorConfig.getFieldSpecForColumn(column);
Map<String, FieldIndexConfigs> indexConfigsByCol = FieldIndexConfigsUtil.createIndexConfigsByColName(
statsCollectorConfig.getTableConfig(), statsCollectorConfig.getSchema());
boolean isDictionaryEnabled = indexConfigsByCol.get(column).getConfig(StandardIndexes.dictionary()).isEnabled();
Expand Down Expand Up @@ -96,6 +99,9 @@ public void collect(Object entry) {
for (Map.Entry<String, Object> mapValueEntry : mapValue.entrySet()) {
String key = mapValueEntry.getKey();
Object value = mapValueEntry.getValue();
if (value == null) {
continue;
}
_keyFrequencies.merge(key, 1, Integer::sum);
AbstractColumnStatisticsCollector keyStats = _keyStats.get(key);
if (keyStats == null) {
Expand All @@ -105,6 +111,67 @@ public void collect(Object entry) {
updatePartition(key);
}
}
if (keyStats instanceof NoDictColumnStatisticsCollector) {
keyStats.collect(value);
continue;
}
if (keyStats instanceof StringColumnPreIndexStatsCollector) {
if (value instanceof String || value instanceof Number || value instanceof Boolean) {
keyStats.collect(String.valueOf(value));
continue;
}
try {
keyStats.collect(JsonUtils.objectToString(value));
continue;
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize value for key '" + key + "': " + value, e);
}
}
if (keyStats instanceof BigDecimalColumnPreIndexStatsCollector) {
try {
keyStats.collect(new BigDecimal(value.toString()));
} catch (NumberFormatException e) {
LOGGER.error("Failed to parse BigDecimal for key '{}', value '{}': {}", key, value, e.getMessage());
// Skip collecting this value for statistics
}
continue;
}
if (value instanceof Number) {
Number valueNumber = (Number) value;
if (keyStats instanceof IntColumnPreIndexStatsCollector) {
keyStats.collect(valueNumber.intValue());
continue;
}
if (keyStats instanceof LongColumnPreIndexStatsCollector) {
keyStats.collect(valueNumber.longValue());
continue;
}
if (keyStats instanceof FloatColumnPreIndexStatsCollector) {
keyStats.collect(valueNumber.floatValue());
continue;
}
if (keyStats instanceof DoubleColumnPreIndexStatsCollector) {
keyStats.collect(valueNumber.doubleValue());
continue;
}
}
if (keyStats instanceof IntColumnPreIndexStatsCollector) {
keyStats.collect(Integer.parseInt(value.toString()));
continue;
}
if (keyStats instanceof LongColumnPreIndexStatsCollector) {
keyStats.collect(Long.parseLong(value.toString()));
continue;
}
if (keyStats instanceof FloatColumnPreIndexStatsCollector) {
keyStats.collect(Float.parseFloat(value.toString()));
continue;
}
if (keyStats instanceof DoubleColumnPreIndexStatsCollector) {
keyStats.collect(Double.parseDouble(value.toString()));
continue;
}
// Catch all
keyStats.collect(value);
}
_totalNumberOfEntries++;
Expand Down Expand Up @@ -161,7 +228,6 @@ public int getCardinality() {
public void seal() {
if (!_sealed) {
//All the keys which have appeared less than total docs insert default null Value in unique values
FieldSpec valueFieldSpec = _colFieldSpec.getChildFieldSpec("value");
for (Map.Entry<String, Integer> entry : _keyFrequencies.entrySet()) {
if (entry.getValue() < _totalNumberOfEntries) {
_keyStats.get(entry.getKey()).collect(_keyStats.get(entry.getKey())._fieldSpec.getDefaultNullValue());
Expand Down Expand Up @@ -196,7 +262,6 @@ private AbstractColumnStatisticsCollector createKeyStatsCollector(String key, Ob
if (_createNoDictCollectorsForKeys) {
return new NoDictColumnStatisticsCollector(key, config);
}

switch (type) {
case INTEGER:
return new IntColumnPreIndexStatsCollector(key, config);
Expand All @@ -208,18 +273,23 @@ private AbstractColumnStatisticsCollector createKeyStatsCollector(String key, Ob
return new DoubleColumnPreIndexStatsCollector(key, config);
case BIG_DECIMAL:
return new BigDecimalColumnPreIndexStatsCollector(key, config);
case BOOLEAN:
case STRING:
case MAP:
case OBJECT:
return new StringColumnPreIndexStatsCollector(key, config);
default:
throw new UnsupportedOperationException(String.format("MAP column does not yet support '%s'", type));
LOGGER.warn("Unknown data type {} for key {} and value type {}", type, key, value.getClass().getName());
return new StringColumnPreIndexStatsCollector(key, config);
}
}

/**
* Convert Map value data type to stored field type.
* Note that all unknown types are automatically converted to String type.
*/
private static FieldSpec.DataType convertToDataType(PinotDataType ty) {
// TODO: I've been told that we already have a function to do this, so find that function and replace this
switch (ty) {
case BOOLEAN:
return FieldSpec.DataType.BOOLEAN;
case SHORT:
case INTEGER:
return FieldSpec.DataType.INT;
Expand All @@ -233,10 +303,12 @@ private static FieldSpec.DataType convertToDataType(PinotDataType ty) {
return FieldSpec.DataType.BIG_DECIMAL;
case TIMESTAMP:
return FieldSpec.DataType.TIMESTAMP;
case BOOLEAN:
case STRING:
return FieldSpec.DataType.STRING;
case OBJECT:
case MAP:
default:
throw new UnsupportedOperationException();
return FieldSpec.DataType.STRING;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@
import org.apache.pinot.segment.local.segment.creator.impl.stats.FloatColumnPreIndexStatsCollector;
import org.apache.pinot.segment.local.segment.creator.impl.stats.IntColumnPreIndexStatsCollector;
import org.apache.pinot.segment.local.segment.creator.impl.stats.LongColumnPreIndexStatsCollector;
import org.apache.pinot.segment.local.segment.creator.impl.stats.MapColumnPreIndexStatsCollector;
import org.apache.pinot.segment.local.segment.creator.impl.stats.NoDictColumnStatisticsCollector;
import org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
import org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexCreatorFactory;
import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexPlugin;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
Expand Down Expand Up @@ -414,8 +413,7 @@ protected boolean createColumnV1Indices(String column)
if (!_segmentWriter.hasIndexFor(argument, StandardIndexes.forward())) {
throw new UnsupportedOperationException(String.format("Operation not supported! Cannot create a derived "
+ "column %s because argument: %s does not have a forward index. Enable forward index and "
+ "refresh/backfill the segments to create a derived column from source column %s", column,
argument,
+ "refresh/backfill the segments to create a derived column from source column", column,
argument));
}
argumentsMetadata.add(columnMetadata);
Expand Down Expand Up @@ -809,6 +807,30 @@ private void createDerivedColumnV1Indices(String column, FunctionEvaluator funct
new ByteArray((byte[]) fieldSpec.getDefaultNullValue()));
break;
}
case MAP: {
// Ensure each value is non-null; default for MAP is an empty map
for (int i = 0; i < numDocs; i++) {
if (outputValues[i] == null) {
outputValues[i] = fieldSpec.getDefaultNullValue();
}
}

// Use MapColumnPreIndexStatsCollector for collecting MAP stats
AbstractColumnStatisticsCollector statsCollector =
new MapColumnPreIndexStatsCollector(column, statsCollectorConfig);
for (Object value : outputValues) {
statsCollector.collect(value);
}
statsCollector.seal();

// MAP does not use dictionary encoding
createDictionary = false;
indexCreationInfo =
new ColumnIndexCreationInfo(statsCollector, /* createDictionary */ false, false, true,
fieldSpec.getDefaultNullValue());
break;
}

default:
throw new IllegalStateException();
}
Expand Down Expand Up @@ -1166,8 +1188,11 @@ private void createDerivedColumnForwardIndexWithoutDictionary(String column, Fie
case BYTES:
forwardIndexCreator.putBytes((byte[]) outputValue);
break;
case MAP:
forwardIndexCreator.add(outputValue, -1);
break;
default:
throw new IllegalStateException();
throw new IllegalStateException("Unsupported data type: " + fieldSpec.getDataType());
}
}
} else {
Expand All @@ -1193,10 +1218,11 @@ private void createDerivedColumnForwardIndexWithoutDictionary(String column, Fie
forwardIndexCreator.putBytesMV((byte[][]) outputValue);
break;
default:
throw new IllegalStateException();
throw new IllegalStateException("Unsupported data type: " + fieldSpec.getDataType());
}
}
}
forwardIndexCreator.seal();
}

// Add the column metadata
Expand All @@ -1222,13 +1248,12 @@ private ForwardIndexCreator getForwardIndexCreator(FieldSpec fieldSpec, ColumnIn
ForwardIndexConfig forwardIndexConfig = null;
FieldIndexConfigs fieldIndexConfig = _indexLoadingConfig.getFieldIndexConfig(column);
if (fieldIndexConfig != null) {
forwardIndexConfig = fieldIndexConfig.getConfig(new ForwardIndexPlugin().getIndexType());
forwardIndexConfig = fieldIndexConfig.getConfig(StandardIndexes.forward());
}
if (forwardIndexConfig == null) {
forwardIndexConfig = new ForwardIndexConfig(false, null, null, null, null, null, null);
}

return ForwardIndexCreatorFactory.createIndexCreator(indexCreationContext, forwardIndexConfig);
return StandardIndexes.forward().createIndexCreator(indexCreationContext, forwardIndexConfig);
}

@SuppressWarnings("rawtypes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
Expand Down Expand Up @@ -393,11 +394,40 @@ public void testTransformFunctionContinueOnError() {
genericRow = new GenericRow();
genericRow.putValue("x", "abcd");
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("y"), null);
Assert.assertNull(genericRow.getValue("y"));
// Invalid case: x is null, y is int
genericRow = new GenericRow();
genericRow.putValue("x", null);
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("y"), null);
Assert.assertNull(genericRow.getValue("y"));
}

@Test
public void testJsonToMapIngestionTransform() {
Schema schema = new Schema.SchemaBuilder()
.addSingleValueDimension("columnJson", FieldSpec.DataType.STRING)
.addComplex("columnMap", FieldSpec.DataType.MAP, Map.of(
"a", new DimensionFieldSpec("a", FieldSpec.DataType.INT, true),
"b", new DimensionFieldSpec("b", FieldSpec.DataType.STRING, true)
))
.build();

IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(Collections.singletonList(
new TransformConfig("columnMap", "jsonStringToMap(columnJson)")));
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
.setTableName("testJsonToMapIngestionTransform")
.setIngestionConfig(ingestionConfig)
.build();

ExpressionTransformer expressionTransformer = new ExpressionTransformer(tableConfig, schema);

GenericRow row = new GenericRow();
row.putValue("columnJson", "{\"a\":1,\"b\":\"x\"}");

expressionTransformer.transform(row);
Map<String, Object> map = (Map<String, Object>) row.getValue("columnMap");
Assert.assertEquals(map.get("a"), 1);
Assert.assertEquals(map.get("b"), "x");
}
}
Loading
Loading