Skip to content

Commit 001491b

Browse files
committed
Support MAP type in derived column creation during segment reload
1 parent ceb883a commit 001491b

File tree

5 files changed

+171
-15
lines changed

5 files changed

+171
-15
lines changed

pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ public static PinotDataType getArgumentType(Class<?> clazz) {
149149
if (Collection.class.isAssignableFrom(clazz)) {
150150
return PinotDataType.COLLECTION;
151151
}
152+
if (Map.class.isAssignableFrom(clazz)) {
153+
return PinotDataType.MAP;
154+
}
152155
return ARGUMENT_TYPE_MAP.get(clazz);
153156
}
154157

pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.pinot.spi.utils.BooleanUtils;
3333
import org.apache.pinot.spi.utils.BytesUtils;
3434
import org.apache.pinot.spi.utils.JsonUtils;
35+
import org.apache.pinot.spi.utils.MapUtils;
3536
import org.apache.pinot.spi.utils.TimestampUtils;
3637

3738

@@ -827,6 +828,8 @@ public Object convert(Object value, PinotDataType sourceType) {
827828
} catch (Exception e) {
828829
throw new RuntimeException("Unable to convert String to Map. Input value: " + value, e);
829830
}
831+
case BYTES:
832+
return MapUtils.deserializeMap((byte[]) value);
830833
case OBJECT:
831834
case MAP:
832835
if (value instanceof Map) {
@@ -840,6 +843,16 @@ public Object convert(Object value, PinotDataType sourceType) {
840843
sourceType, value.getClass()));
841844
}
842845
}
846+
847+
@SuppressWarnings("unchecked")
848+
@Override
849+
public byte[] toBytes(Object value) {
850+
if (!(value instanceof Map)) {
851+
throw new UnsupportedOperationException("Cannot convert non-Map value to BYTES for MAP type: "
852+
+ (value == null ? "null" : value.getClass()));
853+
}
854+
return MapUtils.serializeMap((Map<String, Object>) value);
855+
}
843856
},
844857

845858
BYTE_ARRAY {

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java

Lines changed: 89 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@
1818
*/
1919
package org.apache.pinot.segment.local.segment.creator.impl.stats;
2020

21+
import com.fasterxml.jackson.core.JsonProcessingException;
2122
import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
23+
import java.math.BigDecimal;
24+
import java.text.NumberFormat;
2225
import java.util.Arrays;
26+
import java.util.Locale;
2327
import java.util.Map;
2428
import org.apache.pinot.common.utils.PinotDataType;
2529
import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
@@ -32,8 +36,11 @@
3236
import org.apache.pinot.spi.data.DimensionFieldSpec;
3337
import org.apache.pinot.spi.data.FieldSpec;
3438
import org.apache.pinot.spi.data.Schema;
39+
import org.apache.pinot.spi.utils.JsonUtils;
3540
import org.apache.pinot.spi.utils.MapUtils;
3641
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
42+
import org.slf4j.Logger;
43+
import org.slf4j.LoggerFactory;
3744

3845

3946
/**
@@ -51,14 +58,15 @@
5158
* heterogeneous value types for a key are encountered will construct the Map statistics it can be raised as a fault.
5259
*/
5360
public class MapColumnPreIndexStatsCollector extends AbstractColumnStatisticsCollector {
61+
private static final Logger LOGGER = LoggerFactory.getLogger(MapColumnPreIndexStatsCollector.class);
5462
private final Object2ObjectOpenHashMap<String, AbstractColumnStatisticsCollector> _keyStats =
5563
new Object2ObjectOpenHashMap<>(INITIAL_HASH_SET_SIZE);
5664
private final Map<String, Integer> _keyFrequencies = new Object2ObjectOpenHashMap<>(INITIAL_HASH_SET_SIZE);
5765
private String[] _sortedKeys;
5866
private int _minLength = Integer.MAX_VALUE;
5967
private int _maxLength = 0;
6068
private boolean _sealed = false;
61-
private ComplexFieldSpec _colFieldSpec;
69+
private final ComplexFieldSpec _colFieldSpec;
6270
private boolean _createNoDictCollectorsForKeys = false;
6371

6472
public MapColumnPreIndexStatsCollector(String column, StatsCollectorConfig statsCollectorConfig) {
@@ -96,6 +104,9 @@ public void collect(Object entry) {
96104
for (Map.Entry<String, Object> mapValueEntry : mapValue.entrySet()) {
97105
String key = mapValueEntry.getKey();
98106
Object value = mapValueEntry.getValue();
107+
if (value == null) {
108+
continue;
109+
}
99110
_keyFrequencies.merge(key, 1, Integer::sum);
100111
AbstractColumnStatisticsCollector keyStats = _keyStats.get(key);
101112
if (keyStats == null) {
@@ -105,6 +116,48 @@ public void collect(Object entry) {
105116
updatePartition(key);
106117
}
107118
}
119+
if (keyStats instanceof StringColumnPreIndexStatsCollector) {
120+
if (value instanceof String || value instanceof Number || value instanceof Boolean) {
121+
keyStats.collect(String.valueOf(value));
122+
continue;
123+
}
124+
try {
125+
keyStats.collect(JsonUtils.objectToString(value));
126+
continue;
127+
} catch (JsonProcessingException e) {
128+
throw new RuntimeException("Failed to serialize value for key '" + key + "': " + value, e);
129+
}
130+
}
131+
132+
Number valueNumber;
133+
if (value instanceof Number) {
134+
valueNumber = (Number) value;
135+
} else {
136+
valueNumber = parseFlexibleNumber(value.toString());
137+
}
138+
if (valueNumber == null) {
139+
continue;
140+
}
141+
if (keyStats instanceof IntColumnPreIndexStatsCollector) {
142+
keyStats.collect(valueNumber.intValue());
143+
continue;
144+
}
145+
if (keyStats instanceof LongColumnPreIndexStatsCollector) {
146+
keyStats.collect(valueNumber.longValue());
147+
continue;
148+
}
149+
if (keyStats instanceof FloatColumnPreIndexStatsCollector) {
150+
keyStats.collect(valueNumber.floatValue());
151+
continue;
152+
}
153+
if (keyStats instanceof DoubleColumnPreIndexStatsCollector) {
154+
keyStats.collect(valueNumber.doubleValue());
155+
continue;
156+
}
157+
if (keyStats instanceof BigDecimalColumnPreIndexStatsCollector) {
158+
keyStats.collect(new BigDecimal(valueNumber.toString()));
159+
continue;
160+
}
108161
keyStats.collect(value);
109162
}
110163
_totalNumberOfEntries++;
@@ -113,6 +166,28 @@ public void collect(Object entry) {
113166
}
114167
}
115168

169+
private Number parseFlexibleNumber(String input) {
170+
if (input == null) {
171+
return null;
172+
}
173+
String s = input.trim();
174+
if (s.isEmpty()) {
175+
return null;
176+
}
177+
try {
178+
// Try BigDecimal first — it supports everything cleanly
179+
return new BigDecimal(s);
180+
} catch (NumberFormatException e) {
181+
try {
182+
// Try locale parsing fallback
183+
NumberFormat nf = NumberFormat.getInstance(Locale.US);
184+
return nf.parse(s);
185+
} catch (Exception ignored) {
186+
return null;
187+
}
188+
}
189+
}
190+
116191
@Override
117192
public String getMinValue() {
118193
if (_sealed) {
@@ -196,7 +271,6 @@ private AbstractColumnStatisticsCollector createKeyStatsCollector(String key, Ob
196271
if (_createNoDictCollectorsForKeys) {
197272
return new NoDictColumnStatisticsCollector(key, config);
198273
}
199-
200274
switch (type) {
201275
case INTEGER:
202276
return new IntColumnPreIndexStatsCollector(key, config);
@@ -208,18 +282,23 @@ private AbstractColumnStatisticsCollector createKeyStatsCollector(String key, Ob
208282
return new DoubleColumnPreIndexStatsCollector(key, config);
209283
case BIG_DECIMAL:
210284
return new BigDecimalColumnPreIndexStatsCollector(key, config);
285+
case BOOLEAN:
211286
case STRING:
287+
case MAP:
288+
case OBJECT:
212289
return new StringColumnPreIndexStatsCollector(key, config);
213290
default:
214-
throw new UnsupportedOperationException(String.format("MAP column does not yet support '%s'", type));
291+
LOGGER.warn("Unknown data type {} for key {} and value {}", type, key, value);
292+
return new StringColumnPreIndexStatsCollector(key, config);
215293
}
216294
}
217295

296+
/**
297+
* Convert Map value data type to stored field type.
298+
* Note that all unknown types are automatically converted to String type.
299+
*/
218300
private static FieldSpec.DataType convertToDataType(PinotDataType ty) {
219-
// TODO: I've been told that we already have a function to do this, so find that function and replace this
220301
switch (ty) {
221-
case BOOLEAN:
222-
return FieldSpec.DataType.BOOLEAN;
223302
case SHORT:
224303
case INTEGER:
225304
return FieldSpec.DataType.INT;
@@ -233,10 +312,12 @@ private static FieldSpec.DataType convertToDataType(PinotDataType ty) {
233312
return FieldSpec.DataType.BIG_DECIMAL;
234313
case TIMESTAMP:
235314
return FieldSpec.DataType.TIMESTAMP;
315+
case BOOLEAN:
236316
case STRING:
237-
return FieldSpec.DataType.STRING;
317+
case OBJECT:
318+
case MAP:
238319
default:
239-
throw new UnsupportedOperationException();
320+
return FieldSpec.DataType.STRING;
240321
}
241322
}
242323
}

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,10 @@
4747
import org.apache.pinot.segment.local.segment.creator.impl.stats.FloatColumnPreIndexStatsCollector;
4848
import org.apache.pinot.segment.local.segment.creator.impl.stats.IntColumnPreIndexStatsCollector;
4949
import org.apache.pinot.segment.local.segment.creator.impl.stats.LongColumnPreIndexStatsCollector;
50+
import org.apache.pinot.segment.local.segment.creator.impl.stats.MapColumnPreIndexStatsCollector;
5051
import org.apache.pinot.segment.local.segment.creator.impl.stats.NoDictColumnStatisticsCollector;
5152
import org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
5253
import org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
53-
import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexCreatorFactory;
5454
import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexPlugin;
5555
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
5656
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
@@ -809,6 +809,30 @@ private void createDerivedColumnV1Indices(String column, FunctionEvaluator funct
809809
new ByteArray((byte[]) fieldSpec.getDefaultNullValue()));
810810
break;
811811
}
812+
case MAP: {
813+
// Ensure each value is non-null; default for MAP is an empty map
814+
for (int i = 0; i < numDocs; i++) {
815+
if (outputValues[i] == null) {
816+
outputValues[i] = fieldSpec.getDefaultNullValue();
817+
}
818+
}
819+
820+
// Use MapColumnPreIndexStatsCollector for collecting MAP stats
821+
AbstractColumnStatisticsCollector statsCollector =
822+
new MapColumnPreIndexStatsCollector(column, statsCollectorConfig);
823+
for (Object value : outputValues) {
824+
statsCollector.collect(value);
825+
}
826+
statsCollector.seal();
827+
828+
// MAP does not use dictionary encoding
829+
createDictionary = false;
830+
indexCreationInfo =
831+
new ColumnIndexCreationInfo(statsCollector, /* createDictionary */ false, false, true,
832+
fieldSpec.getDefaultNullValue());
833+
break;
834+
}
835+
812836
default:
813837
throw new IllegalStateException();
814838
}
@@ -1166,8 +1190,12 @@ private void createDerivedColumnForwardIndexWithoutDictionary(String column, Fie
11661190
case BYTES:
11671191
forwardIndexCreator.putBytes((byte[]) outputValue);
11681192
break;
1193+
case MAP:
1194+
forwardIndexCreator.add(outputValue, -1);
1195+
break;
11691196
default:
1170-
throw new IllegalStateException();
1197+
throw new IllegalStateException(
1198+
"Unsupported data type: " + fieldSpec.getDataType() + ", for value: " + outputValue);
11711199
}
11721200
}
11731201
} else {
@@ -1193,10 +1221,12 @@ private void createDerivedColumnForwardIndexWithoutDictionary(String column, Fie
11931221
forwardIndexCreator.putBytesMV((byte[][]) outputValue);
11941222
break;
11951223
default:
1196-
throw new IllegalStateException();
1224+
throw new IllegalStateException(
1225+
"Unsupported data type: " + fieldSpec.getDataType() + ", for value: " + outputValue);
11971226
}
11981227
}
11991228
}
1229+
forwardIndexCreator.seal();
12001230
}
12011231

12021232
// Add the column metadata
@@ -1227,8 +1257,7 @@ private ForwardIndexCreator getForwardIndexCreator(FieldSpec fieldSpec, ColumnIn
12271257
if (forwardIndexConfig == null) {
12281258
forwardIndexConfig = new ForwardIndexConfig(false, null, null, null, null, null, null);
12291259
}
1230-
1231-
return ForwardIndexCreatorFactory.createIndexCreator(indexCreationContext, forwardIndexConfig);
1260+
return StandardIndexes.forward().createIndexCreator(indexCreationContext, forwardIndexConfig);
12321261
}
12331262

12341263
@SuppressWarnings("rawtypes")

pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.HashMap;
2525
import java.util.Iterator;
2626
import java.util.List;
27+
import java.util.Map;
2728
import java.util.concurrent.TimeUnit;
2829
import org.apache.pinot.spi.config.table.TableConfig;
2930
import org.apache.pinot.spi.config.table.TableType;
@@ -393,11 +394,40 @@ public void testTransformFunctionContinueOnError() {
393394
genericRow = new GenericRow();
394395
genericRow.putValue("x", "abcd");
395396
expressionTransformer.transform(genericRow);
396-
Assert.assertEquals(genericRow.getValue("y"), null);
397+
Assert.assertNull(genericRow.getValue("y"));
397398
// Invalid case: x is null, y is int
398399
genericRow = new GenericRow();
399400
genericRow.putValue("x", null);
400401
expressionTransformer.transform(genericRow);
401-
Assert.assertEquals(genericRow.getValue("y"), null);
402+
Assert.assertNull(genericRow.getValue("y"));
403+
}
404+
405+
@Test
406+
public void testJsonToMapIngestionTransform() {
407+
Schema schema = new Schema.SchemaBuilder()
408+
.addSingleValueDimension("columnJson", FieldSpec.DataType.STRING)
409+
.addComplex("columnMap", FieldSpec.DataType.MAP, Map.of(
410+
"a", new DimensionFieldSpec("a", FieldSpec.DataType.INT, true),
411+
"b", new DimensionFieldSpec("b", FieldSpec.DataType.STRING, true)
412+
))
413+
.build();
414+
415+
IngestionConfig ingestionConfig = new IngestionConfig();
416+
ingestionConfig.setTransformConfigs(Collections.singletonList(
417+
new TransformConfig("columnMap", "jsonStringToMap(columnJson)")));
418+
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
419+
.setTableName("testJsonToMapIngestionTransform")
420+
.setIngestionConfig(ingestionConfig)
421+
.build();
422+
423+
ExpressionTransformer expressionTransformer = new ExpressionTransformer(tableConfig, schema);
424+
425+
GenericRow row = new GenericRow();
426+
row.putValue("columnJson", "{\"a\":1,\"b\":\"x\"}");
427+
428+
expressionTransformer.transform(row);
429+
Map<String, Object> map = (Map<String, Object>) row.getValue("columnMap");
430+
Assert.assertEquals(map.get("a"), 1);
431+
Assert.assertEquals(map.get("b"), "x");
402432
}
403433
}

0 commit comments

Comments
 (0)