Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhli1142015 committed Dec 13, 2024
1 parent f9a2389 commit 1ef2f81
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 63 deletions.
17 changes: 9 additions & 8 deletions velox/docs/functions/spark/json.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ JSON Functions

.. spark:function:: from_json(jsonString) -> [json object]
Casting a JSON text to a supported type returns the value represented by
the JSON text if it matches the target type; otherwise, NULL is returned.
The function supports ARRAY, MAP, and ROW as root types. For primitive
Casting a JSON text to the function's output type returns the value
represented by the JSON text if it matches the output type; otherwise, NULL
is returned.
The function supports ARRAY, MAP, and ROW as output types. For primitive
values, supported types include BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT,
REAL, DOUBLE or VARCHAR. Casting to ARRAY and MAP is supported when the
element type of the array or the value type of the map is one of these
Expand All @@ -25,8 +26,8 @@ JSON Functions
match the field names of the ROW exactly (case-sensitive).
Behaviors of the casts are shown with the examples below. ::

SELECT from_json('{"a": true}'); -- {'a'=true} // Output type: ROW(a BOOLEAN)
SELECT from_json('{"a": 1}'); -- {'a'=1} // Output type: ROW(a INTEGER)
SELECT from_json('{"a": 1.0}'); -- {'a'=1.0} // Output type: ROW(a DOUBLE)
SELECT from_json('["name", "age", "id"]'); -- ['name', 'age', 'id'] // Output type: ARRAY(VARCHAR)
SELECT from_json('{"a": 1, "b": 2}'); -- {'a'=1, 'b'=2} // Output type: MAP(VARCHAR,INTEGER)
SELECT from_json('{"a": true}'); -- {'a'=true} // Output type: ROW({"a"}, {BOOLEAN()})
SELECT from_json('{"a": 1}'); -- {'a'=1} // Output type: ROW({"a"}, {INTEGER()})
SELECT from_json('{"a": 1.0}'); -- {'a'=1.0} // Output type: ROW({"a"}, {DOUBLE()})
SELECT from_json('["name", "age", "id"]'); -- ['name', 'age', 'id'] // Output type: ARRAY(VARCHAR())
SELECT from_json('{"a": 1, "b": 2}'); -- {'a'=1, 'b'=2} // Output type: MAP(VARCHAR(),INTEGER())
102 changes: 47 additions & 55 deletions velox/functions/sparksql/specialforms/FromJson.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
#include <limits>
#include <stdexcept>

#include "velox/expression/CastExpr.h"
#include "velox/expression/EvalCtx.h"
#include "velox/expression/PeeledEncoding.h"
#include "velox/expression/ScopedVarSetter.h"
#include "velox/expression/SpecialForm.h"
#include "velox/expression/VectorWriters.h"
#include "velox/functions/prestosql/json/SIMDJsonUtil.h"
Expand All @@ -33,8 +30,9 @@ using namespace facebook::velox::exec;
namespace facebook::velox::functions::sparksql {
namespace {

/// Struct for extracting JSON data and writing it with type-specific handling.
template <typename Input>
struct ParseJsonTypedImpl {
struct ExtractJsonTypedImpl {
template <TypeKind kind>
static simdjson::error_code
apply(Input input, exec::GenericWriter& writer, bool isRoot) {
Expand All @@ -55,7 +53,7 @@ struct ParseJsonTypedImpl {
template <typename Dummy>
struct KindDispatcher<TypeKind::VARCHAR, Dummy> {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
apply(Input value, exec::GenericWriter& writer, bool /*isRoot*/) {
SIMDJSON_ASSIGN_OR_RAISE(auto type, value.type());
std::string_view s;
switch (type) {
Expand All @@ -78,7 +76,7 @@ struct ParseJsonTypedImpl {
template <typename Dummy>
struct KindDispatcher<TypeKind::BOOLEAN, Dummy> {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
apply(Input value, exec::GenericWriter& writer, bool /*isRoot*/) {
SIMDJSON_ASSIGN_OR_RAISE(auto type, value.type());
auto& w = writer.castTo<bool>();
switch (type) {
Expand All @@ -96,47 +94,47 @@ struct ParseJsonTypedImpl {
template <typename Dummy>
struct KindDispatcher<TypeKind::TINYINT, Dummy> {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
apply(Input value, exec::GenericWriter& writer, bool /*isRoot*/) {
return castJsonToInt<int8_t>(value, writer);
}
};

template <typename Dummy>
struct KindDispatcher<TypeKind::SMALLINT, Dummy> {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
apply(Input value, exec::GenericWriter& writer, bool /*isRoot*/) {
return castJsonToInt<int16_t>(value, writer);
}
};

template <typename Dummy>
struct KindDispatcher<TypeKind::INTEGER, Dummy> {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
apply(Input value, exec::GenericWriter& writer, bool /*isRoot*/) {
return castJsonToInt<int32_t>(value, writer);
}
};

template <typename Dummy>
struct KindDispatcher<TypeKind::BIGINT, Dummy> {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
apply(Input value, exec::GenericWriter& writer, bool /*isRoot*/) {
return castJsonToInt<int64_t>(value, writer);
}
};

template <typename Dummy>
struct KindDispatcher<TypeKind::REAL, Dummy> {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
apply(Input value, exec::GenericWriter& writer, bool /*isRoot*/) {
return castJsonToFloatingPoint<float>(value, writer);
}
};

template <typename Dummy>
struct KindDispatcher<TypeKind::DOUBLE, Dummy> {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
apply(Input value, exec::GenericWriter& writer, bool /*isRoot*/) {
return castJsonToFloatingPoint<double>(value, writer);
}
};
Expand All @@ -146,7 +144,7 @@ struct ParseJsonTypedImpl {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
auto& writerTyped = writer.castTo<Array<Any>>();
auto& elementType = writer.type()->childAt(0);
const auto& elementType = writer.type()->childAt(0);
SIMDJSON_ASSIGN_OR_RAISE(auto type, value.type());
if (type == simdjson::ondemand::json_type::array) {
SIMDJSON_ASSIGN_OR_RAISE(auto array, value.get_array());
Expand All @@ -158,7 +156,7 @@ struct ParseJsonTypedImpl {
writerTyped.add_null();
} else {
SIMDJSON_TRY(VELOX_DYNAMIC_TYPE_DISPATCH(
ParseJsonTypedImpl<simdjson::ondemand::value>::apply,
ExtractJsonTypedImpl<simdjson::ondemand::value>::apply,
elementType->kind(),
element,
writerTyped.add_item(),
Expand All @@ -167,7 +165,7 @@ struct ParseJsonTypedImpl {
}
} else if (elementType->kind() == TypeKind::ROW && isRoot) {
SIMDJSON_TRY(VELOX_DYNAMIC_TYPE_DISPATCH(
ParseJsonTypedImpl<simdjson::ondemand::value>::apply,
ExtractJsonTypedImpl<simdjson::ondemand::value>::apply,
elementType->kind(),
value,
writerTyped.add_item(),
Expand All @@ -182,10 +180,9 @@ struct ParseJsonTypedImpl {
template <typename Dummy>
struct KindDispatcher<TypeKind::MAP, Dummy> {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
apply(Input value, exec::GenericWriter& writer, bool /*isRoot*/) {
auto& writerTyped = writer.castTo<Map<Any, Any>>();
auto& keyType = writer.type()->childAt(0);
auto& valueType = writer.type()->childAt(1);
const auto& valueType = writer.type()->childAt(1);
SIMDJSON_ASSIGN_OR_RAISE(auto object, value.get_object());
for (auto fieldResult : object) {
SIMDJSON_ASSIGN_OR_RAISE(auto field, fieldResult);
Expand All @@ -198,7 +195,7 @@ struct ParseJsonTypedImpl {
auto writers = writerTyped.add_item();
std::get<0>(writers).castTo<Varchar>().append(key);
SIMDJSON_TRY(VELOX_DYNAMIC_TYPE_DISPATCH(
ParseJsonTypedImpl<simdjson::ondemand::value>::apply,
ExtractJsonTypedImpl<simdjson::ondemand::value>::apply,
valueType->kind(),
field.value(),
std::get<1>(writers),
Expand All @@ -213,7 +210,7 @@ struct ParseJsonTypedImpl {
struct KindDispatcher<TypeKind::ROW, Dummy> {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
auto& rowType = writer.type()->asRow();
const auto& rowType = writer.type()->asRow();
auto& writerTyped = writer.castTo<DynamicRow>();
if (value.type().error() != ::simdjson::SUCCESS) {
writerTyped.set_null_at(0);
Expand Down Expand Up @@ -245,7 +242,7 @@ struct ParseJsonTypedImpl {
it->second = -1;

auto res = VELOX_DYNAMIC_TYPE_DISPATCH(
ParseJsonTypedImpl<simdjson::ondemand::value>::apply,
ExtractJsonTypedImpl<simdjson::ondemand::value>::apply,
rowType.childAt(index)->kind(),
field.value(),
writerTyped.get_writer_at(index),
Expand All @@ -257,12 +254,14 @@ struct ParseJsonTypedImpl {
}
}

for (const auto& [key, index] : fieldIndices) {
for (const auto& [_, index] : fieldIndices) {
if (index >= 0) {
writerTyped.set_null_at(index);
}
}
} else {
// Handle other JSON types: set null to the writer if it's the root doc,
// otherwise return INCORRECT_TYPE to the caller.
if (isRoot) {
writerTyped.set_null_at(0);
return simdjson::SUCCESS;
Expand All @@ -274,23 +273,6 @@ struct ParseJsonTypedImpl {
}
};

static simdjson::simdjson_result<std::string_view> rawJson(
Input value,
simdjson::ondemand::json_type type) {
switch (type) {
case simdjson::ondemand::json_type::array: {
SIMDJSON_ASSIGN_OR_RAISE(auto array, value.get_array());
return array.raw_json();
}
case simdjson::ondemand::json_type::object: {
SIMDJSON_ASSIGN_OR_RAISE(auto object, value.get_object());
return object.raw_json();
}
default:
return value.raw_json_token();
}
}

template <typename T>
static simdjson::error_code castJsonToInt(
Input value,
Expand All @@ -307,14 +289,15 @@ struct ParseJsonTypedImpl {
default:
return simdjson::INCORRECT_TYPE;
}
break;
}
default:
return simdjson::INCORRECT_TYPE;
}
return simdjson::SUCCESS;
}

// Casts a JSON value to a float point, handling both numeric Special cases
// for NaN and Infinity.
template <typename T>
static simdjson::error_code castJsonToFloatingPoint(
Input value,
Expand Down Expand Up @@ -368,8 +351,9 @@ struct ParseJsonTypedImpl {
/// - Failure Handling: Returns `NULL` for invalid JSON or incompatible values.
/// - Boolean: Only `true` and `false` are valid; others return `NULL`.
/// - Integral Types: Accepts only integers; floats or strings return `NULL`.
/// - Float/Double: All numbers are valid; strings like `"NaN"` , `"INF"`
/// `"Infinity"` are accepted, others return `NULL`.
/// - Float/Double: All numbers are valid; strings like `"NaN"`, `"+INF"`,
/// `"+Infinity"`, `"Infinity"`, `"-INF"`,
/// `"-Infinity"` are accepted, others return `NULL`.
/// - Array: Accepts JSON objects only if the array is the root type with ROW
/// child type.
/// - Map: Keys must be `VARCHAR` type.
Expand Down Expand Up @@ -425,7 +409,7 @@ class FromJsonFunction final : public exec::VectorFunction {
context.applyToSelectedNoThrow(rows, [&](auto row) {
writer.setOffset(row);
if (error != simdjson::SUCCESS ||
paseJsonOneRow(jsonDoc, writer) != simdjson::SUCCESS) {
extractJsonToWriter(jsonDoc, writer) != simdjson::SUCCESS) {
writer.commitNull();
}
});
Expand All @@ -448,7 +432,7 @@ class FromJsonFunction final : public exec::VectorFunction {
if (inputVector->isNullAt(row)) {
return;
}
auto& input = inputVector->valueAt(row);
const auto& input = inputVector->valueAt(row);
maxSize = std::max(maxSize, input.size());
});
paddedInput_.resize(maxSize + simdjson::SIMDJSON_PADDING);
Expand All @@ -458,52 +442,61 @@ class FromJsonFunction final : public exec::VectorFunction {
writer.commitNull();
return;
}
auto& input = inputVector->valueAt(row);
const auto& input = inputVector->valueAt(row);
memcpy(paddedInput_.data(), input.data(), input.size());
simdjson::padded_string_view paddedInput(
paddedInput_.data(), input.size(), paddedInput_.size());
simdjson::ondemand::document doc;
auto error = simdjsonParse(paddedInput).get(doc);
if (error != simdjson::SUCCESS ||
paseJsonOneRow(doc, writer) != simdjson::SUCCESS) {
extractJsonToWriter(doc, writer) != simdjson::SUCCESS) {
writer.commitNull();
}
});
writer.finish();
}

static simdjson::error_code paseJsonOneRow(
// Extracts data from json doc and writes it to writer.
static simdjson::error_code extractJsonToWriter(
simdjson::ondemand::document& doc,
exec::VectorWriter<Any>& writer) {
if (doc.is_null()) {
writer.commitNull();
} else {
SIMDJSON_TRY(
ParseJsonTypedImpl<simdjson::ondemand::document&>::apply<kind>(
ExtractJsonTypedImpl<simdjson::ondemand::document&>::apply<kind>(
doc, writer.current(), true));
writer.commit(true);
}
return simdjson::SUCCESS;
}

// The buffer with extra bytes for parser::parse(),
mutable std::string paddedInput_;
};

/// Determines whether a given type is supported.
/// @param isRootType. A flag indicating whether the type is the root type in
/// the evaluation context. Only ROW, ARRAY, and MAP are allowed as root types;
/// this flag helps differentiate such cases.
bool isSupportedType(const TypePtr& type, bool isRootType) {
switch (type->kind()) {
case TypeKind::ARRAY:
case TypeKind::ARRAY: {
return isSupportedType(type->childAt(0), false);
case TypeKind::ROW:
for (const auto& child : type->as<TypeKind::ROW>().children()) {
}
case TypeKind::ROW: {
for (const auto& child : asRowType(type)->children()) {
if (!isSupportedType(child, false)) {
return false;
}
}
return true;
case TypeKind::MAP:
}
case TypeKind::MAP: {
return (
type->childAt(0)->kind() == TypeKind::VARCHAR &&
isSupportedType(type->childAt(1), false));
}
case TypeKind::BIGINT: {
if (type->isDecimal()) {
return false;
Expand Down Expand Up @@ -547,9 +540,8 @@ exec::ExprPtr FromJsonCallToSpecialForm::constructSpecialForm(
TypeKind::VARCHAR,
"The first argument of from_json should be of varchar type.");

if (!isSupportedType(type, true)) {
VELOX_UNSUPPORTED("Unsupported type {}.", type->toString());
}
VELOX_USER_CHECK(
isSupportedType(type, true), "Unsupported type {}.", type->toString());

std::shared_ptr<exec::VectorFunction> func;
if (type->kind() == TypeKind::ARRAY) {
Expand Down

0 comments on commit 1ef2f81

Please sign in to comment.