Skip to content

Commit c7f323f

Browse files
committed
Support MAP type in derived column creation during segment reload
1 parent 62ec58c commit c7f323f

File tree

6 files changed

+299
-21
lines changed

6 files changed

+299
-21
lines changed

pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ public static PinotDataType getArgumentType(Class<?> clazz) {
149149
if (Collection.class.isAssignableFrom(clazz)) {
150150
return PinotDataType.COLLECTION;
151151
}
152+
if (Map.class.isAssignableFrom(clazz)) {
153+
return PinotDataType.MAP;
154+
}
152155
return ARGUMENT_TYPE_MAP.get(clazz);
153156
}
154157

pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.pinot.spi.utils.BooleanUtils;
3333
import org.apache.pinot.spi.utils.BytesUtils;
3434
import org.apache.pinot.spi.utils.JsonUtils;
35+
import org.apache.pinot.spi.utils.MapUtils;
3536
import org.apache.pinot.spi.utils.TimestampUtils;
3637

3738

@@ -827,6 +828,8 @@ public Object convert(Object value, PinotDataType sourceType) {
827828
} catch (Exception e) {
828829
throw new RuntimeException("Unable to convert String to Map. Input value: " + value, e);
829830
}
831+
case BYTES:
832+
return MapUtils.deserializeMap((byte[]) value);
830833
case OBJECT:
831834
case MAP:
832835
if (value instanceof Map) {
@@ -840,6 +843,16 @@ public Object convert(Object value, PinotDataType sourceType) {
840843
sourceType, value.getClass()));
841844
}
842845
}
846+
847+
@SuppressWarnings("unchecked")
848+
@Override
849+
public byte[] toBytes(Object value) {
850+
if (!(value instanceof Map)) {
851+
throw new UnsupportedOperationException("Cannot convert non-Map value to BYTES for MAP type: "
852+
+ (value == null ? "null" : value.getClass()));
853+
}
854+
return MapUtils.serializeMap((Map<String, Object>) value);
855+
}
843856
},
844857

845858
BYTE_ARRAY {

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java

Lines changed: 78 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pinot.segment.local.segment.creator.impl.stats;
2020

21+
import com.fasterxml.jackson.core.JsonProcessingException;
2122
import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
2223
import java.util.Arrays;
2324
import java.util.Map;
@@ -28,18 +29,20 @@
2829
import org.apache.pinot.segment.spi.index.StandardIndexes;
2930
import org.apache.pinot.spi.config.table.TableConfig;
3031
import org.apache.pinot.spi.config.table.TableType;
31-
import org.apache.pinot.spi.data.ComplexFieldSpec;
3232
import org.apache.pinot.spi.data.DimensionFieldSpec;
3333
import org.apache.pinot.spi.data.FieldSpec;
3434
import org.apache.pinot.spi.data.Schema;
35+
import org.apache.pinot.spi.utils.JsonUtils;
3536
import org.apache.pinot.spi.utils.MapUtils;
3637
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
3740

3841

3942
/**
4043
* Extension of {@link AbstractColumnStatisticsCollector} for Map column type.
4144
*
42-
* The Map column type is different than other columns in that it is essentially recursive. It contains keys
45+
* The Map column type is different from other columns in that it is essentially recursive. It contains keys
4346
* and those keys are analogous to columns and, as such, have Key level statistics. So, this class keeps track of
4447
* Map column level statistics _and_ Key level statistics. The Key Level statistics can then be used during
4548
* the creation of the Immutable Segment to make decisions about how keys will be stored or what Map data structure
@@ -51,20 +54,19 @@
5154
* heterogeneous value types for a key are encountered will construct the Map statistics it can be raised as a fault.
5255
*/
5356
public class MapColumnPreIndexStatsCollector extends AbstractColumnStatisticsCollector {
57+
private static final Logger LOGGER = LoggerFactory.getLogger(MapColumnPreIndexStatsCollector.class);
5458
private final Object2ObjectOpenHashMap<String, AbstractColumnStatisticsCollector> _keyStats =
5559
new Object2ObjectOpenHashMap<>(INITIAL_HASH_SET_SIZE);
5660
private final Map<String, Integer> _keyFrequencies = new Object2ObjectOpenHashMap<>(INITIAL_HASH_SET_SIZE);
5761
private String[] _sortedKeys;
5862
private int _minLength = Integer.MAX_VALUE;
5963
private int _maxLength = 0;
6064
private boolean _sealed = false;
61-
private ComplexFieldSpec _colFieldSpec;
6265
private boolean _createNoDictCollectorsForKeys = false;
6366

6467
public MapColumnPreIndexStatsCollector(String column, StatsCollectorConfig statsCollectorConfig) {
6568
super(column, statsCollectorConfig);
6669
_sorted = false;
67-
_colFieldSpec = (ComplexFieldSpec) statsCollectorConfig.getFieldSpecForColumn(column);
6870
Map<String, FieldIndexConfigs> indexConfigsByCol = FieldIndexConfigsUtil.createIndexConfigsByColName(
6971
statsCollectorConfig.getTableConfig(), statsCollectorConfig.getSchema());
7072
boolean isDictionaryEnabled = indexConfigsByCol.get(column).getConfig(StandardIndexes.dictionary()).isEnabled();
@@ -96,6 +98,9 @@ public void collect(Object entry) {
9698
for (Map.Entry<String, Object> mapValueEntry : mapValue.entrySet()) {
9799
String key = mapValueEntry.getKey();
98100
Object value = mapValueEntry.getValue();
101+
if (value == null) {
102+
continue;
103+
}
99104
_keyFrequencies.merge(key, 1, Integer::sum);
100105
AbstractColumnStatisticsCollector keyStats = _keyStats.get(key);
101106
if (keyStats == null) {
@@ -105,6 +110,62 @@ public void collect(Object entry) {
105110
updatePartition(key);
106111
}
107112
}
113+
if (keyStats instanceof NoDictColumnStatisticsCollector) {
114+
keyStats.collect(value);
115+
continue;
116+
}
117+
if (keyStats instanceof StringColumnPreIndexStatsCollector) {
118+
if (value instanceof String || value instanceof Number || value instanceof Boolean) {
119+
keyStats.collect(String.valueOf(value));
120+
continue;
121+
}
122+
try {
123+
keyStats.collect(JsonUtils.objectToString(value));
124+
continue;
125+
} catch (JsonProcessingException e) {
126+
throw new RuntimeException("Failed to serialize value for key '" + key + "': " + value, e);
127+
}
128+
}
129+
if (keyStats instanceof BigDecimalColumnPreIndexStatsCollector) {
130+
keyStats.collect(PinotDataType.STRING.toBigDecimal(value.toString()));
131+
continue;
132+
}
133+
if (value instanceof Number) {
134+
Number valueNumber = (Number) value;
135+
if (keyStats instanceof IntColumnPreIndexStatsCollector) {
136+
keyStats.collect(valueNumber.intValue());
137+
continue;
138+
}
139+
if (keyStats instanceof LongColumnPreIndexStatsCollector) {
140+
keyStats.collect(valueNumber.longValue());
141+
continue;
142+
}
143+
if (keyStats instanceof FloatColumnPreIndexStatsCollector) {
144+
keyStats.collect(valueNumber.floatValue());
145+
continue;
146+
}
147+
if (keyStats instanceof DoubleColumnPreIndexStatsCollector) {
148+
keyStats.collect(valueNumber.doubleValue());
149+
continue;
150+
}
151+
}
152+
if (keyStats instanceof IntColumnPreIndexStatsCollector) {
153+
keyStats.collect(PinotDataType.STRING.toInt(value.toString()));
154+
continue;
155+
}
156+
if (keyStats instanceof LongColumnPreIndexStatsCollector) {
157+
keyStats.collect(PinotDataType.STRING.toLong(value.toString()));
158+
continue;
159+
}
160+
if (keyStats instanceof FloatColumnPreIndexStatsCollector) {
161+
keyStats.collect(PinotDataType.STRING.toFloat(value.toString()));
162+
continue;
163+
}
164+
if (keyStats instanceof DoubleColumnPreIndexStatsCollector) {
165+
keyStats.collect(PinotDataType.STRING.toDouble(value.toString()));
166+
continue;
167+
}
168+
// Catch all
108169
keyStats.collect(value);
109170
}
110171
_totalNumberOfEntries++;
@@ -161,7 +222,6 @@ public int getCardinality() {
161222
public void seal() {
162223
if (!_sealed) {
163224
//All the keys which have appeared less than total docs insert default null Value in unique values
164-
FieldSpec valueFieldSpec = _colFieldSpec.getChildFieldSpec("value");
165225
for (Map.Entry<String, Integer> entry : _keyFrequencies.entrySet()) {
166226
if (entry.getValue() < _totalNumberOfEntries) {
167227
_keyStats.get(entry.getKey()).collect(_keyStats.get(entry.getKey())._fieldSpec.getDefaultNullValue());
@@ -196,7 +256,6 @@ private AbstractColumnStatisticsCollector createKeyStatsCollector(String key, Ob
196256
if (_createNoDictCollectorsForKeys) {
197257
return new NoDictColumnStatisticsCollector(key, config);
198258
}
199-
200259
switch (type) {
201260
case INTEGER:
202261
return new IntColumnPreIndexStatsCollector(key, config);
@@ -208,18 +267,23 @@ private AbstractColumnStatisticsCollector createKeyStatsCollector(String key, Ob
208267
return new DoubleColumnPreIndexStatsCollector(key, config);
209268
case BIG_DECIMAL:
210269
return new BigDecimalColumnPreIndexStatsCollector(key, config);
270+
case BOOLEAN:
211271
case STRING:
272+
case MAP:
273+
case OBJECT:
212274
return new StringColumnPreIndexStatsCollector(key, config);
213275
default:
214-
throw new UnsupportedOperationException(String.format("MAP column does not yet support '%s'", type));
276+
LOGGER.warn("Unknown data type {} for key {} and value {}", type, key, value);
277+
return new StringColumnPreIndexStatsCollector(key, config);
215278
}
216279
}
217280

281+
/**
282+
* Convert Map value data type to stored field type.
283+
* Note that all unknown types are automatically converted to String type.
284+
*/
218285
private static FieldSpec.DataType convertToDataType(PinotDataType ty) {
219-
// TODO: I've been told that we already have a function to do this, so find that function and replace this
220286
switch (ty) {
221-
case BOOLEAN:
222-
return FieldSpec.DataType.BOOLEAN;
223287
case SHORT:
224288
case INTEGER:
225289
return FieldSpec.DataType.INT;
@@ -233,10 +297,12 @@ private static FieldSpec.DataType convertToDataType(PinotDataType ty) {
233297
return FieldSpec.DataType.BIG_DECIMAL;
234298
case TIMESTAMP:
235299
return FieldSpec.DataType.TIMESTAMP;
300+
case BOOLEAN:
236301
case STRING:
237-
return FieldSpec.DataType.STRING;
302+
case OBJECT:
303+
case MAP:
238304
default:
239-
throw new UnsupportedOperationException();
305+
return FieldSpec.DataType.STRING;
240306
}
241307
}
242308
}

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,10 @@
4747
import org.apache.pinot.segment.local.segment.creator.impl.stats.FloatColumnPreIndexStatsCollector;
4848
import org.apache.pinot.segment.local.segment.creator.impl.stats.IntColumnPreIndexStatsCollector;
4949
import org.apache.pinot.segment.local.segment.creator.impl.stats.LongColumnPreIndexStatsCollector;
50+
import org.apache.pinot.segment.local.segment.creator.impl.stats.MapColumnPreIndexStatsCollector;
5051
import org.apache.pinot.segment.local.segment.creator.impl.stats.NoDictColumnStatisticsCollector;
5152
import org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
5253
import org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
53-
import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexCreatorFactory;
54-
import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexPlugin;
5554
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
5655
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
5756
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
@@ -809,6 +808,30 @@ private void createDerivedColumnV1Indices(String column, FunctionEvaluator funct
809808
new ByteArray((byte[]) fieldSpec.getDefaultNullValue()));
810809
break;
811810
}
811+
case MAP: {
812+
// Ensure each value is non-null; default for MAP is an empty map
813+
for (int i = 0; i < numDocs; i++) {
814+
if (outputValues[i] == null) {
815+
outputValues[i] = fieldSpec.getDefaultNullValue();
816+
}
817+
}
818+
819+
// Use MapColumnPreIndexStatsCollector for collecting MAP stats
820+
AbstractColumnStatisticsCollector statsCollector =
821+
new MapColumnPreIndexStatsCollector(column, statsCollectorConfig);
822+
for (Object value : outputValues) {
823+
statsCollector.collect(value);
824+
}
825+
statsCollector.seal();
826+
827+
// MAP does not use dictionary encoding
828+
createDictionary = false;
829+
indexCreationInfo =
830+
new ColumnIndexCreationInfo(statsCollector, /* createDictionary */ false, false, true,
831+
fieldSpec.getDefaultNullValue());
832+
break;
833+
}
834+
812835
default:
813836
throw new IllegalStateException();
814837
}
@@ -1166,8 +1189,12 @@ private void createDerivedColumnForwardIndexWithoutDictionary(String column, Fie
11661189
case BYTES:
11671190
forwardIndexCreator.putBytes((byte[]) outputValue);
11681191
break;
1192+
case MAP:
1193+
forwardIndexCreator.add(outputValue, -1);
1194+
break;
11691195
default:
1170-
throw new IllegalStateException();
1196+
throw new IllegalStateException(
1197+
"Unsupported data type: " + fieldSpec.getDataType() + ", for value: " + outputValue);
11711198
}
11721199
}
11731200
} else {
@@ -1193,10 +1220,12 @@ private void createDerivedColumnForwardIndexWithoutDictionary(String column, Fie
11931220
forwardIndexCreator.putBytesMV((byte[][]) outputValue);
11941221
break;
11951222
default:
1196-
throw new IllegalStateException();
1223+
throw new IllegalStateException(
1224+
"Unsupported data type: " + fieldSpec.getDataType() + ", for value: " + outputValue);
11971225
}
11981226
}
11991227
}
1228+
forwardIndexCreator.seal();
12001229
}
12011230

12021231
// Add the column metadata
@@ -1222,13 +1251,12 @@ private ForwardIndexCreator getForwardIndexCreator(FieldSpec fieldSpec, ColumnIn
12221251
ForwardIndexConfig forwardIndexConfig = null;
12231252
FieldIndexConfigs fieldIndexConfig = _indexLoadingConfig.getFieldIndexConfig(column);
12241253
if (fieldIndexConfig != null) {
1225-
forwardIndexConfig = fieldIndexConfig.getConfig(new ForwardIndexPlugin().getIndexType());
1254+
forwardIndexConfig = fieldIndexConfig.getConfig(StandardIndexes.forward());
12261255
}
12271256
if (forwardIndexConfig == null) {
12281257
forwardIndexConfig = new ForwardIndexConfig(false, null, null, null, null, null, null);
12291258
}
1230-
1231-
return ForwardIndexCreatorFactory.createIndexCreator(indexCreationContext, forwardIndexConfig);
1259+
return StandardIndexes.forward().createIndexCreator(indexCreationContext, forwardIndexConfig);
12321260
}
12331261

12341262
@SuppressWarnings("rawtypes")

pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.HashMap;
2525
import java.util.Iterator;
2626
import java.util.List;
27+
import java.util.Map;
2728
import java.util.concurrent.TimeUnit;
2829
import org.apache.pinot.spi.config.table.TableConfig;
2930
import org.apache.pinot.spi.config.table.TableType;
@@ -393,11 +394,40 @@ public void testTransformFunctionContinueOnError() {
393394
genericRow = new GenericRow();
394395
genericRow.putValue("x", "abcd");
395396
expressionTransformer.transform(genericRow);
396-
Assert.assertEquals(genericRow.getValue("y"), null);
397+
Assert.assertNull(genericRow.getValue("y"));
397398
// Invalid case: x is null, y is int
398399
genericRow = new GenericRow();
399400
genericRow.putValue("x", null);
400401
expressionTransformer.transform(genericRow);
401-
Assert.assertEquals(genericRow.getValue("y"), null);
402+
Assert.assertNull(genericRow.getValue("y"));
403+
}
404+
405+
@Test
406+
public void testJsonToMapIngestionTransform() {
407+
Schema schema = new Schema.SchemaBuilder()
408+
.addSingleValueDimension("columnJson", FieldSpec.DataType.STRING)
409+
.addComplex("columnMap", FieldSpec.DataType.MAP, Map.of(
410+
"a", new DimensionFieldSpec("a", FieldSpec.DataType.INT, true),
411+
"b", new DimensionFieldSpec("b", FieldSpec.DataType.STRING, true)
412+
))
413+
.build();
414+
415+
IngestionConfig ingestionConfig = new IngestionConfig();
416+
ingestionConfig.setTransformConfigs(Collections.singletonList(
417+
new TransformConfig("columnMap", "jsonStringToMap(columnJson)")));
418+
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
419+
.setTableName("testJsonToMapIngestionTransform")
420+
.setIngestionConfig(ingestionConfig)
421+
.build();
422+
423+
ExpressionTransformer expressionTransformer = new ExpressionTransformer(tableConfig, schema);
424+
425+
GenericRow row = new GenericRow();
426+
row.putValue("columnJson", "{\"a\":1,\"b\":\"x\"}");
427+
428+
expressionTransformer.transform(row);
429+
Map<String, Object> map = (Map<String, Object>) row.getValue("columnMap");
430+
Assert.assertEquals(map.get("a"), 1);
431+
Assert.assertEquals(map.get("b"), "x");
402432
}
403433
}

0 commit comments

Comments
 (0)