Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parse JSON into org.embulk.spi.json.JsonValue, not org.msgpack.value.Value #8

Merged
merged 5 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,20 @@ java {
dependencies {
compileOnly "org.embulk:embulk-spi:0.11"
compileOnly "org.slf4j:slf4j-api:2.0.7"
compileOnly "org.msgpack:msgpack-core:0.8.24"

implementation "com.fasterxml.jackson.core:jackson-annotations:2.6.7"
implementation "com.fasterxml.jackson.core:jackson-core:2.6.7"
implementation "com.fasterxml.jackson.core:jackson-databind:2.6.7.5"
implementation "org.embulk:embulk-util-config:0.3.4"
implementation "org.embulk:embulk-util-file:0.1.5"
implementation "org.embulk:embulk-util-json:0.2.2"
implementation "org.embulk:embulk-util-json:0.3.0"
implementation "org.embulk:embulk-util-timestamp:0.2.2"

testImplementation "junit:junit:4.13.2"
testImplementation "org.embulk:embulk-spi:0.10.50"
testImplementation "org.embulk:embulk-core:0.10.50"
testImplementation "org.embulk:embulk-deps:0.10.50"
testImplementation "org.embulk:embulk-junit4:0.10.50"
testImplementation "com.google.guava:guava:18.0"
testImplementation "org.embulk:embulk-spi:0.11"
testImplementation "org.embulk:embulk-core:0.11.0"
testImplementation "org.embulk:embulk-deps:0.11.0"
testImplementation "org.embulk:embulk-junit4:0.11.0"
}

embulkPlugin {
Expand Down
2 changes: 1 addition & 1 deletion gradle.lockfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ javax.validation:validation-api:1.1.0.Final=compileClasspath,runtimeClasspath
org.embulk:embulk-spi:0.11=compileClasspath
org.embulk:embulk-util-config:0.3.4=compileClasspath,runtimeClasspath
org.embulk:embulk-util-file:0.1.5=compileClasspath,runtimeClasspath
org.embulk:embulk-util-json:0.2.2=compileClasspath,runtimeClasspath
org.embulk:embulk-util-json:0.3.0=compileClasspath,runtimeClasspath
org.embulk:embulk-util-rubytime:0.3.3=compileClasspath,runtimeClasspath
org.embulk:embulk-util-timestamp:0.2.2=compileClasspath,runtimeClasspath
org.msgpack:msgpack-core:0.8.24=compileClasspath
Expand Down
157 changes: 89 additions & 68 deletions src/main/java/org/embulk/parser/json/JsonParserPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.embulk.parser.json;

import com.fasterxml.jackson.core.JsonFactory;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand All @@ -27,6 +28,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.regex.Pattern;
Expand All @@ -42,6 +44,8 @@
import org.embulk.spi.PageOutput;
import org.embulk.spi.ParserPlugin;
import org.embulk.spi.Schema;
import org.embulk.spi.json.JsonObject;
import org.embulk.spi.json.JsonValue;
import org.embulk.spi.type.TimestampType;
import org.embulk.spi.type.Types;
import org.embulk.util.config.Config;
Expand All @@ -52,20 +56,12 @@
import org.embulk.util.config.units.SchemaConfig;
import org.embulk.util.file.FileInputInputStream;
import org.embulk.util.json.JsonParseException;
import org.embulk.util.json.JsonParser;
import org.embulk.util.json.JsonValueParser;
import org.embulk.util.timestamp.TimestampFormatter;
import org.msgpack.core.Preconditions;
import org.msgpack.value.MapValue;
import org.msgpack.value.Value;
import org.msgpack.value.ValueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JsonParserPlugin implements ParserPlugin {
public JsonParserPlugin() {
this.jsonParser = new JsonParser();
}

public enum InvalidEscapeStringPolicy {
PASSTHROUGH("PASSTHROUGH"),
SKIP("SKIP"),
Expand Down Expand Up @@ -167,40 +163,54 @@ public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutpu
timestampFormatters.putAll(newTimestampColumnFormattersAsMap(task, task.getSchemaConfig().get()));
jsonPointers.putAll(createJsonPointerMap(schema, schemaConfig));
}
final JsonValueParser.Builder parserBuilder = JsonValueParser.builder(JSON_FACTORY);

try (final PageBuilder pageBuilder = Exec.getPageBuilder(Exec.getBufferAllocator(), schema, output);
FileInputInputStream in = new FileInputInputStream(input)) {
while (in.nextFile()) {
final String fileName = input.hintOfCurrentInputFileNameForLogging().orElse("-");

boolean evenOneJsonParsed = false;
try (JsonParser.Stream stream = newJsonStream(in, task)) {
Value originalValue;
while ((originalValue = stream.next()) != null) {
try (final JsonValueParser parser = newParser(in, task, parserBuilder)) {
JsonValue originalValue;
while ((originalValue = parser.readJsonValue()) != null) {
try {
Value value = originalValue;
JsonValue value = originalValue;
if (task.getRoot().isPresent()) {
try {
value = jsonParser.parseWithOffsetInJsonPointer(originalValue.toJson(), task.getRoot().get());
value = JsonValueParser.builder(JSON_FACTORY)
.root(task.getRoot().get())
.build(originalValue.toJson())
.readJsonValue();
if (value == null) {
throw new JsonRecordValidateException("A Json record doesn't have given 'JSON pointer to root'.");
}
} catch (JsonParseException e) {
/*
* When JsonParseException is thrown, it would be an error that
* the given JSON pointer doesn't match with the JSON object.
* We would return NULL when the pointer doesn't match, not throw Exception.
*
* NOTE: We may change the behavior.
* See: https://github.com/embulk/embulk/pull/1103#discussion_r255807991
*/
throw new JsonRecordValidateException("A Json record doesn't have given 'JSON pointer to root'.");
}
}

final Iterable<Value> recordValues;
final Iterable<JsonValue> recordValues;
if (task.getFlattenJsonArray()) {
if (!value.isArrayValue()) {
if (!value.isJsonArray()) {
throw new JsonRecordValidateException(
String.format(
"A Json record must represent array value with '__experimental__flatten_json_array' option, but it's %s",
value.getValueType().name()));
"A Json record must represent array value with 'flatten_json_array' option, but it's %s",
value.getEntityType().name()));
}
recordValues = value.asArrayValue();
recordValues = value.asJsonArray();
} else {
recordValues = Collections.singletonList(value);
}

for (Value recordValue : recordValues) {
for (final JsonValue recordValue : recordValues) {
addRecord(task, pageBuilder, schema, timestampFormatters, jsonPointers, recordValue);
evenOneJsonParsed = true;
}
Expand Down Expand Up @@ -232,21 +242,21 @@ private void addRecord(
Schema schema,
Map<Column, TimestampFormatter> timestampFormatters,
Map<Column, String> jsonPointers,
Value value) {
if (!value.isMapValue()) {
JsonValue value) {
if (!value.isJsonObject()) {
throw new JsonRecordValidateException(
String.format("A Json record must represent map value but it's %s", value.getValueType().name()));
String.format("A Json record must represent map value but it's %s", value.getEntityType().name()));
}

try {
if (isUsingCustomSchema(task)) {
setValueWithCustomSchema(pageBuilder, schema, timestampFormatters, jsonPointers, value.asMapValue());
setValueWithCustomSchema(pageBuilder, schema, timestampFormatters, jsonPointers, value.asJsonObject());
} else {
setValueWithSingleJsonColumn(pageBuilder, schema, value.asMapValue());
setValueWithSingleJsonColumn(pageBuilder, schema, value.asJsonObject());
}
} catch (final DateTimeParseException ex) {
throw new JsonRecordValidateException(
String.format("A Json record must have valid timestamp value but it's %s", value.getValueType().name()));
String.format("A Json record must have valid timestamp value but it's %s", value.getEntityType().name()));
}
pageBuilder.addRecord();
}
Expand All @@ -255,7 +265,7 @@ private static boolean isUsingCustomSchema(PluginTask task) {
return task.getSchemaConfig().isPresent() && !task.getSchemaConfig().get().isEmpty();
}

private static void setValueWithSingleJsonColumn(PageBuilder pageBuilder, Schema schema, MapValue value) {
private static void setValueWithSingleJsonColumn(PageBuilder pageBuilder, Schema schema, JsonObject value) {
final Column column = schema.getColumn(0); // record column
pageBuilder.setJson(column, value);
}
Expand All @@ -265,23 +275,38 @@ private void setValueWithCustomSchema(
Schema schema,
Map<Column, TimestampFormatter> timestampFormatters,
Map<Column, String> jsonPointers,
MapValue value) {
final Map<Value, Value> map = value.map();
JsonObject value) {
final Map<String, JsonValue> map = value;
String valueAsJsonString = null;
if (!jsonPointers.isEmpty()) {
// Convert to string in order to re-parse with given JSON pointer
valueAsJsonString = value.toJson();
}
for (Column column : schema.getColumns()) {
final String jsonPointer = jsonPointers.get(column);
final Value columnValue;
final JsonValue columnValue;
if (jsonPointer != null) {
columnValue = parseColumnValueWithOffsetInJsonPointer(valueAsJsonString, jsonPointer);
try {
columnValue = JsonValueParser.builder(JSON_FACTORY)
.root(jsonPointer)
.build(valueAsJsonString)
.readJsonValue();
} catch (final IOException ex) {
/*
* When JsonParseException is thrown, it would be an error that
* the given JSON pointer doesn't match with the JSON object.
* We would return NULL when the pointer doesn't match, not throw Exception.
*
* NOTE: We may change the behavior.
* See: https://github.com/embulk/embulk/pull/1103#discussion_r255807991
*/
throw new JsonParseException("Failed to parse JSON: " + valueAsJsonString, ex);
}
} else {
columnValue = map.get(ValueFactory.newString(column.getName()));
columnValue = map.get(column.getName());
}

if (columnValue == null || columnValue.isNilValue()) {
if (columnValue == null || columnValue.isJsonNull()) {
pageBuilder.setNull(column);
continue;
}
Expand All @@ -290,44 +315,44 @@ private void setValueWithCustomSchema(
@Override
public void booleanColumn(Column column) {
final boolean booleanValue;
if (columnValue.isBooleanValue()) {
booleanValue = columnValue.asBooleanValue().getBoolean();
if (columnValue.isJsonBoolean()) {
booleanValue = columnValue.asJsonBoolean().booleanValue();
} else {
booleanValue = Boolean.parseBoolean(columnValue.toString());
booleanValue = Boolean.parseBoolean(asString(columnValue));
}
pageBuilder.setBoolean(column, booleanValue);
}

@Override
public void longColumn(Column column) {
final long longValue;
if (columnValue.isIntegerValue()) {
longValue = columnValue.asIntegerValue().toLong();
if (columnValue.isJsonLong()) {
longValue = columnValue.asJsonLong().longValue();
} else {
longValue = Long.parseLong(columnValue.toString());
longValue = Long.parseLong(asString(columnValue));
}
pageBuilder.setLong(column, longValue);
}

@Override
public void doubleColumn(Column column) {
final double doubleValue;
if (columnValue.isFloatValue()) {
doubleValue = columnValue.asFloatValue().toDouble();
if (columnValue.isJsonDouble()) {
doubleValue = columnValue.asJsonDouble().doubleValue();
} else {
doubleValue = Double.parseDouble(columnValue.toString());
doubleValue = Double.parseDouble(asString(columnValue));
}
pageBuilder.setDouble(column, doubleValue);
}

@Override
public void stringColumn(Column column) {
pageBuilder.setString(column, columnValue.toString());
pageBuilder.setString(column, asString(columnValue));
}

@Override
public void timestampColumn(Column column) {
pageBuilder.setTimestamp(column, timestampFormatters.get(column).parse(columnValue.toString()));
pageBuilder.setTimestamp(column, timestampFormatters.get(column).parse(asString(columnValue)));
}

@Override
Expand All @@ -338,45 +363,28 @@ public void jsonColumn(Column column) {
}
}

private Value parseColumnValueWithOffsetInJsonPointer(String valueAsJsonString, String jsonPointer) {
try {
return jsonParser.parseWithOffsetInJsonPointer(valueAsJsonString, jsonPointer);
} catch (JsonParseException e) {
/*
* When JsonParseException is thrown, it would be an error that the given JSON pointer doesn't match with the JSON object.
* We would return NULL when the pointer doesn't match, not throw Exception.
*
* NOTE: We may change the behavior (ref: https://github.com/embulk/embulk/pull/1103#discussion_r255807991)
*/
return ValueFactory.newNil();
}
}

private JsonParser.Stream newJsonStream(FileInputInputStream in, PluginTask task)
private JsonValueParser newParser(final FileInputInputStream in, final PluginTask task, final JsonValueParser.Builder builder)
throws IOException {
final InvalidEscapeStringPolicy policy = task.getInvalidEscapeStringPolicy();
final InputStream inputStream;
switch (policy) {
case SKIP:
case UNESCAPE:
byte[] lines = new BufferedReader(new InputStreamReader(in))
final byte[] lines = new BufferedReader(new InputStreamReader(in))
.lines()
.map(invalidEscapeStringFunction(policy))
.collect(Collectors.joining())
.getBytes(StandardCharsets.UTF_8);
inputStream = new ByteArrayInputStream(lines);
break;
return builder.build(new ByteArrayInputStream(lines));
case PASSTHROUGH:
default:
inputStream = in;
return builder.build(in);
}

return jsonParser.open(inputStream);
}

static Function<String, String> invalidEscapeStringFunction(final InvalidEscapeStringPolicy policy) {
return input -> {
Preconditions.checkNotNull(input);
Objects.requireNonNull(input);
if (policy == InvalidEscapeStringPolicy.PASSTHROUGH) {
return input;
}
Expand Down Expand Up @@ -469,6 +477,13 @@ private static Map<Column, TimestampFormatter> newTimestampColumnFormattersAsMap
return Collections.unmodifiableMap(formatters);
}

private static String asString(final JsonValue value) {
if (value.isJsonString()) {
return value.asJsonString().getString();
}
return value.toString();
}

static class JsonRecordValidateException extends DataException {
JsonRecordValidateException(String message) {
super(message);
Expand All @@ -481,5 +496,11 @@ static class JsonRecordValidateException extends DataException {

private static final Pattern DIGITS_PATTERN = Pattern.compile("\\p{XDigit}+");

private final JsonParser jsonParser;
private static final JsonFactory JSON_FACTORY;

static {
JSON_FACTORY = new JsonFactory();
JSON_FACTORY.enable(com.fasterxml.jackson.core.JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS);
JSON_FACTORY.enable(com.fasterxml.jackson.core.JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS);
}
}
2 changes: 1 addition & 1 deletion src/test/java/org/embulk/parser/json/Pages.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void jsonColumn(Column column) {
if (record.isNull(column)) {
visit(column, null);
} else {
visit(column, record.getJson(column));
visit(column, record.getJsonValue(column));
}
}
}
Expand Down
Loading