Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More robust json parsing #53

Open
wants to merge 6 commits into
base: delta-core-2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 175 additions & 44 deletions kernel-default/src/main/java/io/delta/core/data/JsonRow.java
Original file line number Diff line number Diff line change
@@ -1,101 +1,232 @@
package io.delta.core.data;

import java.util.HashMap;
import java.util.Map;
import java.util.*;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.delta.core.types.LongType;
import io.delta.core.types.StringType;
import io.delta.core.types.StructField;
import io.delta.core.types.StructType;
import io.delta.core.types.*;

public class JsonRow implements Row {

// TODO: we can do this cleaner / smarter / better
private static Object parse(ObjectNode rootNode, StructField field) {
if (rootNode.get(field.name) == null) {
if (field.nullable) {
return null;
}
////////////////////////////////////////////////////////////////////////////////
// Static Methods
////////////////////////////////////////////////////////////////////////////////

throw new NullPointerException(
String.format("Root node at key %s is null but field %s isn't nullable", rootNode, field)
);
private static Object decodeElement(JsonNode jsonValue, DataType dataType) {
if (dataType instanceof UnresolvedDataType) {
if (jsonValue.isTextual()) {
return jsonValue.textValue();
} else if (jsonValue instanceof ObjectNode) {
throw new RuntimeException("TODO handle UnresolvedDataType of type object");
}
}

JsonNode jsonValue = rootNode.get(field.name);
if (dataType instanceof BooleanType) {
if (!jsonValue.isBoolean()) {
throw new RuntimeException(
String.format("Couldn't decode %s, expected a boolean", jsonValue)
);
}
return jsonValue.booleanValue();
}

if (field.dataType instanceof LongType) {
assert (jsonValue.isLong()) :
String.format("RootNode at %s isn't a long", field.name);
return jsonValue.longValue();
if (dataType instanceof IntegerType) {
// TODO: handle other number cases (e.g. short) and throw on invalid cases (e.g. long)
if (!jsonValue.isInt()) {
throw new RuntimeException(
String.format("Couldn't decode %s, expected an int", jsonValue)
);
}
return jsonValue.intValue();
}

if (field.dataType instanceof StructType) {
assert (jsonValue.isObject()) :
String.format("RootNode at %s isn't a object", field.name);
return (ObjectNode) jsonValue;
if (dataType instanceof LongType) {
if (!jsonValue.isNumber()) {
throw new RuntimeException(
String.format("Couldn't decode %s, expected a long", jsonValue)
);
}
return jsonValue.numberValue().longValue();
}

if (field.dataType instanceof StringType) {
assert (jsonValue.isTextual()) :
String.format("RootNode at %s isn't a string", field.name);
if (dataType instanceof StringType) {
if (!jsonValue.isTextual()) {
throw new RuntimeException(
String.format("Couldn't decode %s, expected a string", jsonValue)
);
}
return jsonValue.textValue();
}

if (dataType instanceof StructType) {
if (!jsonValue.isObject()) {
throw new RuntimeException(
String.format("Couldn't decode %s, expected a struct", jsonValue)
);
}
return new JsonRow((ObjectNode) jsonValue, (StructType) dataType);
}

if (dataType instanceof ArrayType) {
if (!jsonValue.isArray()) {
throw new RuntimeException(
String.format("Couldn't decode %s, expected an array", jsonValue)
);
}
final ArrayType arrayType = ((ArrayType) dataType);
final ArrayNode jsonArray = (ArrayNode) jsonValue;
final List<Object> output = new ArrayList<>();


for (Iterator<JsonNode> it = jsonArray.elements(); it.hasNext();) {
final JsonNode element = it.next();
final Object parsedElement =
decodeElement(element, arrayType.getElementType());
output.add(parsedElement);
}
return output;
}

if (dataType instanceof MapType) {
if (!jsonValue.isObject()) {
throw new RuntimeException(
String.format("Couldn't decode %s, expected a map", jsonValue)
);
}
final MapType mapType = (MapType) dataType;
final Iterator<Map.Entry<String, JsonNode>> iter = ((ObjectNode) jsonValue).fields();
final Map<Object, Object> output = new HashMap<>();

while (iter.hasNext()) {
Map.Entry<String, JsonNode> entry = iter.next();
String keyParsed = entry.getKey(); // TODO: handle non-String keys
Object valueParsed = decodeElement(entry.getValue(), mapType.getValueType());
output.put(keyParsed, valueParsed);
}

return output;
}

throw new UnsupportedOperationException(
String.format("Unsupported DataType %s", field.dataType.typeName())
String.format("Unsupported DataType %s for RootNode %s", dataType.typeName(), jsonValue)
);
}

private static Object decodeField(ObjectNode rootNode, StructField field) {
if (rootNode.get(field.name) == null) {
if (field.nullable) {
return null;
}

throw new RuntimeException(
String.format(
"Root node at key %s is null but field isn't nullable. Root node: %s",
field.name,
rootNode
)
);
}

return decodeElement(rootNode.get(field.name), field.dataType);
}

////////////////////////////////////////////////////////////////////////////////
// Instance Fields / Methods
////////////////////////////////////////////////////////////////////////////////

private final ObjectNode rootNode;
private final Map<Integer, Object> ordinalToValueMap;
private final Object[] parsedValues;
private final StructType readSchema;

public JsonRow(ObjectNode rootNode, StructType readSchema) {
this.rootNode = rootNode;
this.readSchema = readSchema;
this.ordinalToValueMap = new HashMap<>();
this.parsedValues = new Object[readSchema.length()];

for (int i = 0; i < readSchema.length(); i++) {
final StructField field = readSchema.at(i);
Object val = parse(rootNode, field);
ordinalToValueMap.put(i, val);
final Object parsedValue = decodeField(rootNode, field);
parsedValues[i] = parsedValue;
}
}

////////////////////////////////////////
// Public APIs
////////////////////////////////////////

@Override
public boolean isNullAt(int ordinal) {
return parsedValues[ordinal] == null;
}

@Override
public boolean getBoolean(int ordinal) {
assertType(ordinal, BooleanType.INSTANCE);
return (boolean) parsedValues[ordinal];
}

@Override
public int getInt(int ordinal) {
assertType(ordinal, IntegerType.INSTANCE);
return (int) parsedValues[ordinal];
}

@Override
public long getLong(int ordinal) {
assert (readSchema.at(ordinal).dataType instanceof LongType);
return (long) ordinalToValueMap.get(ordinal);
assertType(ordinal, LongType.INSTANCE);
return (long) parsedValues[ordinal];
}

@Override
public String getString(int ordinal) {
assert (readSchema.at(ordinal).dataType instanceof StringType);

return (String) ordinalToValueMap.get(ordinal);
assertType(ordinal, StringType.INSTANCE);
return (String) parsedValues[ordinal];
}

@Override
public Row getRecord(int ordinal) {
assert (readSchema.at(ordinal).dataType instanceof StructType);
assertType(ordinal, StructType.EMPTY_INSTANCE);
return (JsonRow) parsedValues[ordinal];
}

if (ordinalToValueMap.get(ordinal) == null) return null;
@Override
public <T> List<T> getList(int ordinal) {
assertType(ordinal, ArrayType.EMPTY_INSTANCE);
return (List<T>) parsedValues[ordinal];
}

return new JsonRow(
(ObjectNode) ordinalToValueMap.get(ordinal),
(StructType) readSchema.at(ordinal).dataType
);
@Override
public <K, V> Map<K, V> getMap(int ordinal) {
assertType(ordinal, MapType.EMPTY_INSTANCE);
return (Map<K, V>) parsedValues[ordinal];
}

@Override
public String toString() {
return "JsonRow{" +
"rootNode=" + rootNode +
", ordinalToValueMap=" + ordinalToValueMap +
", parsedValues=" + parsedValues +
", readSchema=" + readSchema +
'}';
}

////////////////////////////////////////
// Private Helper Methods
////////////////////////////////////////

private void assertType(int ordinal, DataType expectedType) {
final String actualTypeName = readSchema.at(ordinal).dataType.typeName();
if (!actualTypeName.equals(expectedType.typeName()) &&
!actualTypeName.equals(UnresolvedDataType.INSTANCE.typeName())) {
throw new RuntimeException(
String.format(
"Tried to read %s at ordinal %s but actual data type is %s",
expectedType.typeName(),
ordinal,
actualTypeName
)
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,7 @@ public boolean hasNext() {

@Override
public Row next() {
final String json = iter.next();
try {
final JsonNode jsonNode = objectMapper.readTree(json);
return new JsonRow((ObjectNode) jsonNode, readSchema);
} catch (JsonProcessingException ex) {
throw new RuntimeException(String.format("Could not parse JSON: %s", json), ex);
}
return parseJson(iter.next(), readSchema);
}
};
}
Expand All @@ -130,8 +124,13 @@ public CloseableIterator<Row> readParquetFile(String path, StructType readSchema
}

@Override
public Row parseStats(String statsJson) {
return null;
public Row parseJson(String json, StructType readSchema) {
try {
final JsonNode jsonNode = objectMapper.readTree(json);
return new JsonRow((ObjectNode) jsonNode, readSchema);
} catch (JsonProcessingException ex) {
throw new RuntimeException(String.format("Could not parse JSON: %s", json), ex);
}
}

@Override
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{"commitInfo":{"timestamp":1680202522862,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"part_a\",\"part_b\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"10","numOutputRows":"10","numOutputBytes":"4780"},"engineInfo":"Apache-Spark/3.3.2 Delta-Lake/2.2.0","txnId":"a4769250-e8ad-44a7-8a90-b384ac907381"}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"part_a\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"part_b\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["part_a","part_b"],"configuration":{},"createdTime":1680202519496}}
{"add":{"path":"part_a=0/part_b=0/part-00000-8ddb3168-a63d-49c3-a041-5f99111f6fea.c000.snappy.parquet","partitionValues":{"part_a":"0","part_b":"0"},"size":478,"modificationTime":1680202522000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":0},\"maxValues\":{\"id\":0},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part_a=0/part_b=2/part-00000-2ee5d4ee-89e6-4f57-a3fc-c033a86d7023.c000.snappy.parquet","partitionValues":{"part_a":"0","part_b":"2"},"size":478,"modificationTime":1680202522000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part_a=0/part_b=4/part-00000-75d9907b-1b87-4a41-a1af-750d98a7bfb5.c000.snappy.parquet","partitionValues":{"part_a":"0","part_b":"4"},"size":478,"modificationTime":1680202522000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4},\"maxValues\":{\"id\":4},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part_a=1/part_b=1/part-00000-eb0581fa-b5c3-4ce5-96c9-1f26ce36ec56.c000.snappy.parquet","partitionValues":{"part_a":"1","part_b":"1"},"size":478,"modificationTime":1680202522000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part_a=1/part_b=3/part-00000-6ff44301-7a6c-4c30-a413-5606dee7b638.c000.snappy.parquet","partitionValues":{"part_a":"1","part_b":"3"},"size":478,"modificationTime":1680202522000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":3},\"maxValues\":{\"id\":3},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part_a=0/part_b=1/part-00001-322e51ba-0754-4929-9eb8-6c714a86f901.c000.snappy.parquet","partitionValues":{"part_a":"0","part_b":"1"},"size":478,"modificationTime":1680202522000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":6},\"maxValues\":{\"id\":6},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part_a=0/part_b=3/part-00001-b4c08c7f-fd3e-48a7-95dd-829435d27ba6.c000.snappy.parquet","partitionValues":{"part_a":"0","part_b":"3"},"size":478,"modificationTime":1680202522000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":8},\"maxValues\":{\"id\":8},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part_a=1/part_b=0/part-00001-b51d842e-ab5d-4de8-878e-0812e8e0e16d.c000.snappy.parquet","partitionValues":{"part_a":"1","part_b":"0"},"size":478,"modificationTime":1680202522000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":5},\"maxValues\":{\"id\":5},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part_a=1/part_b=2/part-00001-a5f8c5df-41b0-48b5-b3ac-499d27a94c54.c000.snappy.parquet","partitionValues":{"part_a":"1","part_b":"2"},"size":478,"modificationTime":1680202522000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":7},\"maxValues\":{\"id\":7},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part_a=1/part_b=4/part-00001-f1a5a71a-07ca-45d3-ae0c-09d2db85ed99.c000.snappy.parquet","partitionValues":{"part_a":"1","part_b":"4"},"size":478,"modificationTime":1680202522000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":9},\"maxValues\":{\"id\":9},\"nullCount\":{\"id\":0}}"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{"commitInfo":{"timestamp":1680202527500,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"part_a\",\"part_b\"]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"10","numOutputRows":"10","numOutputBytes":"4779"},"engineInfo":"Apache-Spark/3.3.2 Delta-Lake/2.2.0","txnId":"8aceb033-273d-4ed7-95cf-a5ef87c355e3"}}
{"add":{"path":"part_a=0/part_b=0/part-00000-f4e998f4-27e5-4ed2-b133-be5982448972.c000.snappy.parquet","partitionValues":{"part_a":"0","part_b":"0"},"size":477,"modificationTime":1680202527000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":10},\"maxValues\":{\"id\":10},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part_a=0/part_b=2/part-00000-c4a17e64-ac6c-4dbb-a15c-23dbc9bf0a58.c000.snappy.parquet","partitionValues":{"part_a":"0","part_b":"2"},"size":478,"modificationTime":1680202527000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":12},\"maxValues\":{\"id\":12},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part_a=0/part_b=4/part-00000-39e1b015-006f-41be-9cf7-c713d05d5324.c000.snappy.parquet","partitionValues":{"part_a":"0","part_b":"4"},"size":478,"modificationTime":1680202527000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":14},\"maxValues\":{\"id\":14},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part_a=1/part_b=1/part-00000-81d3661a-87ef-4b8d-ae6f-bb0ca971e91a.c000.snappy.parquet","partitionValues":{"part_a":"1","part_b":"1"},"size":478,"modificationTime":1680202527000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":11},\"maxValues\":{\"id\":11},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part_a=1/part_b=3/part-00000-39d574e1-c089-4ed8-bcce-7b52c1e6fc0d.c000.snappy.parquet","partitionValues":{"part_a":"1","part_b":"3"},"size":478,"modificationTime":1680202527000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":13},\"maxValues\":{\"id\":13},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part_a=0/part_b=1/part-00001-70e9e10a-6f29-4787-8b19-70cc07858d48.c000.snappy.parquet","partitionValues":{"part_a":"0","part_b":"1"},"size":478,"modificationTime":1680202527000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":16},\"maxValues\":{\"id\":16},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part_a=0/part_b=3/part-00001-8b04e895-6377-41f7-89a7-a763d8b9f2be.c000.snappy.parquet","partitionValues":{"part_a":"0","part_b":"3"},"size":478,"modificationTime":1680202527000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":18},\"maxValues\":{\"id\":18},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part_a=1/part_b=0/part-00001-5a0c01cf-07af-47f4-bdbc-b7bb2a577f2f.c000.snappy.parquet","partitionValues":{"part_a":"1","part_b":"0"},"size":478,"modificationTime":1680202527000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":15},\"maxValues\":{\"id\":15},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part_a=1/part_b=2/part-00001-07da6a7d-231d-4c66-9c96-752defe5436b.c000.snappy.parquet","partitionValues":{"part_a":"1","part_b":"2"},"size":478,"modificationTime":1680202527000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":17},\"maxValues\":{\"id\":17},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part_a=1/part_b=4/part-00001-97532fd0-b288-46e2-87a7-da3c10a235de.c000.snappy.parquet","partitionValues":{"part_a":"1","part_b":"4"},"size":478,"modificationTime":1680202527000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":19},\"maxValues\":{\"id\":19},\"nullCount\":{\"id\":0}}"}}
Loading