Skip to content

Commit

Permalink
Merge pull request #8 from embulk/parse-to-JsonValue
Browse files Browse the repository at this point in the history
Parse JSON into org.embulk.spi.json.JsonValue, not org.msgpack.value.Value
  • Loading branch information
dmikurube authored Sep 25, 2023
2 parents 476666f + b8039b4 commit 07e1e93
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 121 deletions.
12 changes: 5 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,20 @@ java {
dependencies {
compileOnly "org.embulk:embulk-spi:0.11"
compileOnly "org.slf4j:slf4j-api:2.0.7"
compileOnly "org.msgpack:msgpack-core:0.8.24"

implementation "com.fasterxml.jackson.core:jackson-annotations:2.6.7"
implementation "com.fasterxml.jackson.core:jackson-core:2.6.7"
implementation "com.fasterxml.jackson.core:jackson-databind:2.6.7.5"
implementation "org.embulk:embulk-util-config:0.3.4"
implementation "org.embulk:embulk-util-file:0.1.5"
implementation "org.embulk:embulk-util-json:0.2.2"
implementation "org.embulk:embulk-util-json:0.3.0"
implementation "org.embulk:embulk-util-timestamp:0.2.2"

testImplementation "junit:junit:4.13.2"
testImplementation "org.embulk:embulk-spi:0.10.50"
testImplementation "org.embulk:embulk-core:0.10.50"
testImplementation "org.embulk:embulk-deps:0.10.50"
testImplementation "org.embulk:embulk-junit4:0.10.50"
testImplementation "com.google.guava:guava:18.0"
testImplementation "org.embulk:embulk-spi:0.11"
testImplementation "org.embulk:embulk-core:0.11.0"
testImplementation "org.embulk:embulk-deps:0.11.0"
testImplementation "org.embulk:embulk-junit4:0.11.0"
}

embulkPlugin {
Expand Down
2 changes: 1 addition & 1 deletion gradle.lockfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ javax.validation:validation-api:1.1.0.Final=compileClasspath,runtimeClasspath
org.embulk:embulk-spi:0.11=compileClasspath
org.embulk:embulk-util-config:0.3.4=compileClasspath,runtimeClasspath
org.embulk:embulk-util-file:0.1.5=compileClasspath,runtimeClasspath
org.embulk:embulk-util-json:0.2.2=compileClasspath,runtimeClasspath
org.embulk:embulk-util-json:0.3.0=compileClasspath,runtimeClasspath
org.embulk:embulk-util-rubytime:0.3.3=compileClasspath,runtimeClasspath
org.embulk:embulk-util-timestamp:0.2.2=compileClasspath,runtimeClasspath
org.msgpack:msgpack-core:0.8.24=compileClasspath
Expand Down
157 changes: 89 additions & 68 deletions src/main/java/org/embulk/parser/json/JsonParserPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.embulk.parser.json;

import com.fasterxml.jackson.core.JsonFactory;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand All @@ -27,6 +28,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.regex.Pattern;
Expand All @@ -42,6 +44,8 @@
import org.embulk.spi.PageOutput;
import org.embulk.spi.ParserPlugin;
import org.embulk.spi.Schema;
import org.embulk.spi.json.JsonObject;
import org.embulk.spi.json.JsonValue;
import org.embulk.spi.type.TimestampType;
import org.embulk.spi.type.Types;
import org.embulk.util.config.Config;
Expand All @@ -52,20 +56,12 @@
import org.embulk.util.config.units.SchemaConfig;
import org.embulk.util.file.FileInputInputStream;
import org.embulk.util.json.JsonParseException;
import org.embulk.util.json.JsonParser;
import org.embulk.util.json.JsonValueParser;
import org.embulk.util.timestamp.TimestampFormatter;
import org.msgpack.core.Preconditions;
import org.msgpack.value.MapValue;
import org.msgpack.value.Value;
import org.msgpack.value.ValueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JsonParserPlugin implements ParserPlugin {
public JsonParserPlugin() {
this.jsonParser = new JsonParser();
}

public enum InvalidEscapeStringPolicy {
PASSTHROUGH("PASSTHROUGH"),
SKIP("SKIP"),
Expand Down Expand Up @@ -167,40 +163,54 @@ public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutpu
timestampFormatters.putAll(newTimestampColumnFormattersAsMap(task, task.getSchemaConfig().get()));
jsonPointers.putAll(createJsonPointerMap(schema, schemaConfig));
}
final JsonValueParser.Builder parserBuilder = JsonValueParser.builder(JSON_FACTORY);

try (final PageBuilder pageBuilder = Exec.getPageBuilder(Exec.getBufferAllocator(), schema, output);
FileInputInputStream in = new FileInputInputStream(input)) {
while (in.nextFile()) {
final String fileName = input.hintOfCurrentInputFileNameForLogging().orElse("-");

boolean evenOneJsonParsed = false;
try (JsonParser.Stream stream = newJsonStream(in, task)) {
Value originalValue;
while ((originalValue = stream.next()) != null) {
try (final JsonValueParser parser = newParser(in, task, parserBuilder)) {
JsonValue originalValue;
while ((originalValue = parser.readJsonValue()) != null) {
try {
Value value = originalValue;
JsonValue value = originalValue;
if (task.getRoot().isPresent()) {
try {
value = jsonParser.parseWithOffsetInJsonPointer(originalValue.toJson(), task.getRoot().get());
value = JsonValueParser.builder(JSON_FACTORY)
.root(task.getRoot().get())
.build(originalValue.toJson())
.readJsonValue();
if (value == null) {
throw new JsonRecordValidateException("A Json record doesn't have given 'JSON pointer to root'.");
}
} catch (JsonParseException e) {
/*
* When JsonParseException is thrown, it would be an error that
* the given JSON pointer doesn't match with the JSON object.
* We would return NULL when the pointer doesn't match, not throw Exception.
*
* NOTE: We may change the behavior.
* See: https://github.com/embulk/embulk/pull/1103#discussion_r255807991
*/
throw new JsonRecordValidateException("A Json record doesn't have given 'JSON pointer to root'.");
}
}

final Iterable<Value> recordValues;
final Iterable<JsonValue> recordValues;
if (task.getFlattenJsonArray()) {
if (!value.isArrayValue()) {
if (!value.isJsonArray()) {
throw new JsonRecordValidateException(
String.format(
"A Json record must represent array value with '__experimental__flatten_json_array' option, but it's %s",
value.getValueType().name()));
"A Json record must represent array value with 'flatten_json_array' option, but it's %s",
value.getEntityType().name()));
}
recordValues = value.asArrayValue();
recordValues = value.asJsonArray();
} else {
recordValues = Collections.singletonList(value);
}

for (Value recordValue : recordValues) {
for (final JsonValue recordValue : recordValues) {
addRecord(task, pageBuilder, schema, timestampFormatters, jsonPointers, recordValue);
evenOneJsonParsed = true;
}
Expand Down Expand Up @@ -232,21 +242,21 @@ private void addRecord(
Schema schema,
Map<Column, TimestampFormatter> timestampFormatters,
Map<Column, String> jsonPointers,
Value value) {
if (!value.isMapValue()) {
JsonValue value) {
if (!value.isJsonObject()) {
throw new JsonRecordValidateException(
String.format("A Json record must represent map value but it's %s", value.getValueType().name()));
String.format("A Json record must represent map value but it's %s", value.getEntityType().name()));
}

try {
if (isUsingCustomSchema(task)) {
setValueWithCustomSchema(pageBuilder, schema, timestampFormatters, jsonPointers, value.asMapValue());
setValueWithCustomSchema(pageBuilder, schema, timestampFormatters, jsonPointers, value.asJsonObject());
} else {
setValueWithSingleJsonColumn(pageBuilder, schema, value.asMapValue());
setValueWithSingleJsonColumn(pageBuilder, schema, value.asJsonObject());
}
} catch (final DateTimeParseException ex) {
throw new JsonRecordValidateException(
String.format("A Json record must have valid timestamp value but it's %s", value.getValueType().name()));
String.format("A Json record must have valid timestamp value but it's %s", value.getEntityType().name()));
}
pageBuilder.addRecord();
}
Expand All @@ -255,7 +265,7 @@ private static boolean isUsingCustomSchema(PluginTask task) {
return task.getSchemaConfig().isPresent() && !task.getSchemaConfig().get().isEmpty();
}

private static void setValueWithSingleJsonColumn(PageBuilder pageBuilder, Schema schema, MapValue value) {
private static void setValueWithSingleJsonColumn(PageBuilder pageBuilder, Schema schema, JsonObject value) {
final Column column = schema.getColumn(0); // record column
pageBuilder.setJson(column, value);
}
Expand All @@ -265,23 +275,38 @@ private void setValueWithCustomSchema(
Schema schema,
Map<Column, TimestampFormatter> timestampFormatters,
Map<Column, String> jsonPointers,
MapValue value) {
final Map<Value, Value> map = value.map();
JsonObject value) {
final Map<String, JsonValue> map = value;
String valueAsJsonString = null;
if (!jsonPointers.isEmpty()) {
// Convert to string in order to re-parse with given JSON pointer
valueAsJsonString = value.toJson();
}
for (Column column : schema.getColumns()) {
final String jsonPointer = jsonPointers.get(column);
final Value columnValue;
final JsonValue columnValue;
if (jsonPointer != null) {
columnValue = parseColumnValueWithOffsetInJsonPointer(valueAsJsonString, jsonPointer);
try {
columnValue = JsonValueParser.builder(JSON_FACTORY)
.root(jsonPointer)
.build(valueAsJsonString)
.readJsonValue();
} catch (final IOException ex) {
/*
* When JsonParseException is thrown, it would be an error that
* the given JSON pointer doesn't match with the JSON object.
* We would return NULL when the pointer doesn't match, not throw Exception.
*
* NOTE: We may change the behavior.
* See: https://github.com/embulk/embulk/pull/1103#discussion_r255807991
*/
throw new JsonParseException("Failed to parse JSON: " + valueAsJsonString, ex);
}
} else {
columnValue = map.get(ValueFactory.newString(column.getName()));
columnValue = map.get(column.getName());
}

if (columnValue == null || columnValue.isNilValue()) {
if (columnValue == null || columnValue.isJsonNull()) {
pageBuilder.setNull(column);
continue;
}
Expand All @@ -290,44 +315,44 @@ private void setValueWithCustomSchema(
@Override
public void booleanColumn(Column column) {
final boolean booleanValue;
if (columnValue.isBooleanValue()) {
booleanValue = columnValue.asBooleanValue().getBoolean();
if (columnValue.isJsonBoolean()) {
booleanValue = columnValue.asJsonBoolean().booleanValue();
} else {
booleanValue = Boolean.parseBoolean(columnValue.toString());
booleanValue = Boolean.parseBoolean(asString(columnValue));
}
pageBuilder.setBoolean(column, booleanValue);
}

@Override
public void longColumn(Column column) {
final long longValue;
if (columnValue.isIntegerValue()) {
longValue = columnValue.asIntegerValue().toLong();
if (columnValue.isJsonLong()) {
longValue = columnValue.asJsonLong().longValue();
} else {
longValue = Long.parseLong(columnValue.toString());
longValue = Long.parseLong(asString(columnValue));
}
pageBuilder.setLong(column, longValue);
}

@Override
public void doubleColumn(Column column) {
final double doubleValue;
if (columnValue.isFloatValue()) {
doubleValue = columnValue.asFloatValue().toDouble();
if (columnValue.isJsonDouble()) {
doubleValue = columnValue.asJsonDouble().doubleValue();
} else {
doubleValue = Double.parseDouble(columnValue.toString());
doubleValue = Double.parseDouble(asString(columnValue));
}
pageBuilder.setDouble(column, doubleValue);
}

@Override
public void stringColumn(Column column) {
pageBuilder.setString(column, columnValue.toString());
pageBuilder.setString(column, asString(columnValue));
}

@Override
public void timestampColumn(Column column) {
pageBuilder.setTimestamp(column, timestampFormatters.get(column).parse(columnValue.toString()));
pageBuilder.setTimestamp(column, timestampFormatters.get(column).parse(asString(columnValue)));
}

@Override
Expand All @@ -338,45 +363,28 @@ public void jsonColumn(Column column) {
}
}

private Value parseColumnValueWithOffsetInJsonPointer(String valueAsJsonString, String jsonPointer) {
try {
return jsonParser.parseWithOffsetInJsonPointer(valueAsJsonString, jsonPointer);
} catch (JsonParseException e) {
/*
* When JsonParseException is thrown, it would be an error that the given JSON pointer doesn't match with the JSON object.
* We would return NULL when the pointer doesn't match, not throw Exception.
*
* NOTE: We may change the behavior (ref: https://github.com/embulk/embulk/pull/1103#discussion_r255807991)
*/
return ValueFactory.newNil();
}
}

private JsonParser.Stream newJsonStream(FileInputInputStream in, PluginTask task)
private JsonValueParser newParser(final FileInputInputStream in, final PluginTask task, final JsonValueParser.Builder builder)
throws IOException {
final InvalidEscapeStringPolicy policy = task.getInvalidEscapeStringPolicy();
final InputStream inputStream;
switch (policy) {
case SKIP:
case UNESCAPE:
byte[] lines = new BufferedReader(new InputStreamReader(in))
final byte[] lines = new BufferedReader(new InputStreamReader(in))
.lines()
.map(invalidEscapeStringFunction(policy))
.collect(Collectors.joining())
.getBytes(StandardCharsets.UTF_8);
inputStream = new ByteArrayInputStream(lines);
break;
return builder.build(new ByteArrayInputStream(lines));
case PASSTHROUGH:
default:
inputStream = in;
return builder.build(in);
}

return jsonParser.open(inputStream);
}

static Function<String, String> invalidEscapeStringFunction(final InvalidEscapeStringPolicy policy) {
return input -> {
Preconditions.checkNotNull(input);
Objects.requireNonNull(input);
if (policy == InvalidEscapeStringPolicy.PASSTHROUGH) {
return input;
}
Expand Down Expand Up @@ -469,6 +477,13 @@ private static Map<Column, TimestampFormatter> newTimestampColumnFormattersAsMap
return Collections.unmodifiableMap(formatters);
}

private static String asString(final JsonValue value) {
if (value.isJsonString()) {
return value.asJsonString().getString();
}
return value.toString();
}

static class JsonRecordValidateException extends DataException {
JsonRecordValidateException(String message) {
super(message);
Expand All @@ -481,5 +496,11 @@ static class JsonRecordValidateException extends DataException {

private static final Pattern DIGITS_PATTERN = Pattern.compile("\\p{XDigit}+");

private final JsonParser jsonParser;
private static final JsonFactory JSON_FACTORY;

static {
JSON_FACTORY = new JsonFactory();
JSON_FACTORY.enable(com.fasterxml.jackson.core.JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS);
JSON_FACTORY.enable(com.fasterxml.jackson.core.JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS);
}
}
2 changes: 1 addition & 1 deletion src/test/java/org/embulk/parser/json/Pages.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void jsonColumn(Column column) {
if (record.isNull(column)) {
visit(column, null);
} else {
visit(column, record.getJson(column));
visit(column, record.getJsonValue(column));
}
}
}
Expand Down
Loading

0 comments on commit 07e1e93

Please sign in to comment.