From 66831a885320b8fe72bec81413ff6593bb65d074 Mon Sep 17 00:00:00 2001 From: yan ma Date: Wed, 7 Feb 2024 17:39:36 +0800 Subject: [PATCH] Add map_from_entries Spark function --- velox/docs/functions/spark/map.rst | 7 + velox/functions/sparksql/CMakeLists.txt | 1 + velox/functions/sparksql/MapFromEntries.cpp | 230 +++++++++ velox/functions/sparksql/Register.cpp | 2 + velox/functions/sparksql/tests/CMakeLists.txt | 1 + .../sparksql/tests/MapFromEntriesTest.cpp | 459 ++++++++++++++++++ 6 files changed, 700 insertions(+) create mode 100644 velox/functions/sparksql/MapFromEntries.cpp create mode 100644 velox/functions/sparksql/tests/MapFromEntriesTest.cpp diff --git a/velox/docs/functions/spark/map.rst b/velox/docs/functions/spark/map.rst index 1a995eca5c03c..13842847271f1 100644 --- a/velox/docs/functions/spark/map.rst +++ b/velox/docs/functions/spark/map.rst @@ -27,6 +27,13 @@ Map Functions SELECT map_from_arrays(array(1.0, 3.0), array('2', '4')); -- {1.0 -> 2, 3.0 -> 4} +.. spark:function:: map_from_entries(struct(K,V)) -> map(K,V) + + Converts an array of entries (key value struct types) to a map of values. All elements in keys should not be null. + If null entry exists in the array, return null for this whole array.:: + + SELECT map_from_entries(array(struct(1, 'a'), struct(2, 'b'))); -- {1 -> 'a', 2 -> 'b'} + .. spark:function:: size(map(K,V)) -> bigint :noindex: diff --git a/velox/functions/sparksql/CMakeLists.txt b/velox/functions/sparksql/CMakeLists.txt index 0522cbfefab5f..6177057552ee8 100644 --- a/velox/functions/sparksql/CMakeLists.txt +++ b/velox/functions/sparksql/CMakeLists.txt @@ -24,6 +24,7 @@ add_library( In.cpp LeastGreatest.cpp Map.cpp + MapFromEntries.cpp RegexFunctions.cpp Register.cpp RegisterArithmetic.cpp diff --git a/velox/functions/sparksql/MapFromEntries.cpp b/velox/functions/sparksql/MapFromEntries.cpp new file mode 100644 index 0000000000000..ee7f0075c6fb8 --- /dev/null +++ b/velox/functions/sparksql/MapFromEntries.cpp @@ -0,0 +1,230 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include "velox/expression/EvalCtx.h" +#include "velox/expression/Expr.h" +#include "velox/expression/VectorFunction.h" +#include "velox/functions/lib/CheckDuplicateKeys.h" +#include "velox/functions/lib/RowsTranslationUtil.h" +#include "velox/vector/BaseVector.h" +#include "velox/vector/ComplexVector.h" + +namespace facebook::velox::functions { +namespace { +static const char* kNullKeyErrorMessage = "map key cannot be null"; +static const char* kIndeterminateKeyErrorMessage = + "map key cannot be indeterminate"; + +class MapFromEntriesFunction : public exec::VectorFunction { + public: + void apply( + const SelectivityVector& rows, + std::vector& args, + const TypePtr& outputType, + exec::EvalCtx& context, + VectorPtr& result) const override { + VELOX_CHECK_EQ(args.size(), 1); + auto& arg = args[0]; + VectorPtr localResult; + // Input can be constant or flat. + if (arg->isConstantEncoding()) { + auto* constantArray = arg->as>(); + const auto& flatArray = constantArray->valueVector(); + const auto flatIndex = constantArray->index(); + + exec::LocalSelectivityVector singleRow(context, flatIndex + 1); + singleRow->clearAll(); + singleRow->setValid(flatIndex, true); + singleRow->updateBounds(); + + localResult = applyFlat( + *singleRow.get(), flatArray->as(), outputType, context); + localResult = + BaseVector::wrapInConstant(rows.size(), flatIndex, localResult); + } else { + localResult = + applyFlat(rows, arg->as(), outputType, context); + } + + context.moveOrCopyResult(localResult, rows, result); + } + + static std::vector> signatures() { + return {// unknown -> map(unknown, unknown) + exec::FunctionSignatureBuilder() + .returnType("map(unknown, unknown)") + .argumentType("unknown") + .build(), + // array(unknown) -> map(unknown, unknown) + exec::FunctionSignatureBuilder() + .returnType("map(unknown, unknown)") + .argumentType("array(unknown)") + .build(), + // array(row(K,V)) -> map(K,V) + exec::FunctionSignatureBuilder() + .typeVariable("K") + .typeVariable("V") + .returnType("map(K,V)") + .argumentType("array(row(K,V))") + .build()}; + } + + private: + VectorPtr applyFlat( + const SelectivityVector& rows, + const ArrayVector* inputArray, + const TypePtr& outputType, + exec::EvalCtx& context) const { + auto& inputValueVector = inputArray->elements(); + exec::LocalDecodedVector decodedRowVector(context); + decodedRowVector.get()->decode(*inputValueVector); + if (inputValueVector->typeKind() == TypeKind::UNKNOWN) { + auto sizes = allocateSizes(rows.end(), context.pool()); + auto offsets = allocateSizes(rows.end(), context.pool()); + + // Output in this case is map(unknown, unknown), but all elements are + // nulls, all offsets and sizes are 0. + return std::make_shared( + context.pool(), + outputType, + inputArray->nulls(), + rows.end(), + sizes, + offsets, + BaseVector::create(UNKNOWN(), 0, context.pool()), + BaseVector::create(UNKNOWN(), 0, context.pool())); + } + + exec::LocalSelectivityVector remianingRows(context, rows); + auto rowVector = decodedRowVector->base()->as(); + auto keyVector = rowVector->childAt(0); + + BufferPtr sizes = allocateSizes(rows.end(), context.pool()); + vector_size_t* mutableSizes = sizes->asMutable(); + rows.applyToSelected([&](vector_size_t row) { + mutableSizes[row] = inputArray->rawSizes()[row]; + }); + + auto resetSize = [&](vector_size_t row) { mutableSizes[row] = 0; }; + auto nulls = allocateNulls(decodedRowVector->size(), context.pool()); + auto* mutableNulls = nulls->asMutable(); + + if (decodedRowVector->mayHaveNulls() || keyVector->mayHaveNulls() || + keyVector->mayHaveNullsRecursive()) { + context.applyToSelectedNoThrow(rows, [&](vector_size_t row) { + const auto size = inputArray->sizeAt(row); + const auto offset = inputArray->offsetAt(row); + + for (auto i = 0; i < size; ++i) { + // For nulls in the top level row vector, return null. + const bool isMapEntryNull = decodedRowVector->isNullAt(offset + i); + if (isMapEntryNull) { + bits::setNull(mutableNulls, row); + break; + } + + // Check null keys. + auto keyIndex = decodedRowVector->index(offset + i); + if (keyVector->isNullAt(keyIndex)) { + resetSize(row); + VELOX_USER_FAIL(kNullKeyErrorMessage); + } + + // Check nested null in keys. + if (keyVector->containsNullAt(keyIndex)) { + resetSize(row); + VELOX_USER_FAIL(fmt::format( + "{}: {}", + kIndeterminateKeyErrorMessage, + keyVector->toString(keyIndex))); + } + } + }); + } + + context.deselectErrors(*remianingRows.get()); + + VectorPtr wrappedKeys; + VectorPtr wrappedValues; + if (decodedRowVector->isIdentityMapping()) { + wrappedKeys = rowVector->childAt(0); + wrappedValues = rowVector->childAt(1); + } else if (decodedRowVector->isConstantMapping()) { + if (decodedRowVector->isNullAt(0)) { + // If top level row is null, child might not be addressable at index 0 + // so we do not try to read it. + wrappedKeys = BaseVector::createNullConstant( + rowVector->childAt(0)->type(), + decodedRowVector->size(), + context.pool()); + wrappedValues = BaseVector::createNullConstant( + rowVector->childAt(1)->type(), + decodedRowVector->size(), + context.pool()); + } else { + wrappedKeys = BaseVector::wrapInConstant( + decodedRowVector->size(), + decodedRowVector->index(0), + rowVector->childAt(0)); + wrappedValues = BaseVector::wrapInConstant( + decodedRowVector->size(), + decodedRowVector->index(0), + rowVector->childAt(1)); + } + } else { + // Dictionary. + auto indices = allocateIndices(decodedRowVector->size(), context.pool()); + memcpy( + indices->asMutable(), + decodedRowVector->indices(), + BaseVector::byteSize(decodedRowVector->size())); + // Any null in the top row(X, Y) should be marked as null since its + // not guranteed to be addressable at X or Y. + for (auto i = 0; i < decodedRowVector->size(); i++) { + if (decodedRowVector->isNullAt(i)) { + bits::setNull(mutableNulls, i); + } + } + wrappedKeys = BaseVector::wrapInDictionary( + nulls, indices, decodedRowVector->size(), rowVector->childAt(0)); + wrappedValues = BaseVector::wrapInDictionary( + nulls, indices, decodedRowVector->size(), rowVector->childAt(1)); + } + + // To avoid creating new buffers, we try to reuse the input's buffers + // as many as possible. + auto mapVector = std::make_shared( + context.pool(), + outputType, + nulls, + rows.end(), + inputArray->offsets(), + sizes, + wrappedKeys, + wrappedValues); + + checkDuplicateKeys(mapVector, *remianingRows, context); + return mapVector; + } +}; +} // namespace + +VELOX_DECLARE_VECTOR_FUNCTION( + udf_map_from_entries, + MapFromEntriesFunction::signatures(), + std::make_unique()); +} // namespace facebook::velox::functions diff --git a/velox/functions/sparksql/Register.cpp b/velox/functions/sparksql/Register.cpp index bd748af7dadfd..f14fdbb58e067 100644 --- a/velox/functions/sparksql/Register.cpp +++ b/velox/functions/sparksql/Register.cpp @@ -69,6 +69,8 @@ static void workAroundRegistrationMacro(const std::string& prefix) { VELOX_REGISTER_VECTOR_FUNCTION( udf_map_allow_duplicates, prefix + "map_from_arrays"); + VELOX_REGISTER_VECTOR_FUNCTION( + udf_map_from_entries, prefix + "map_from_entries"); VELOX_REGISTER_VECTOR_FUNCTION( udf_concat_row, exec::RowConstructorCallToSpecialForm::kRowConstructor); // String functions. diff --git a/velox/functions/sparksql/tests/CMakeLists.txt b/velox/functions/sparksql/tests/CMakeLists.txt index 86d4d1fa5d52e..8757feafbe155 100644 --- a/velox/functions/sparksql/tests/CMakeLists.txt +++ b/velox/functions/sparksql/tests/CMakeLists.txt @@ -31,6 +31,7 @@ add_executable( InTest.cpp LeastGreatestTest.cpp MapTest.cpp + MapFromEntriesTest.cpp MightContainTest.cpp RandTest.cpp RegexFunctionsTest.cpp diff --git a/velox/functions/sparksql/tests/MapFromEntriesTest.cpp b/velox/functions/sparksql/tests/MapFromEntriesTest.cpp new file mode 100644 index 0000000000000..2702ab0a60ddd --- /dev/null +++ b/velox/functions/sparksql/tests/MapFromEntriesTest.cpp @@ -0,0 +1,459 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/functions/lib/CheckDuplicateKeys.h" +#include "velox/functions/prestosql/ArrayConstructor.h" +#include "velox/functions/sparksql/tests/SparkFunctionBaseTest.h" +#include "velox/vector/tests/TestingDictionaryArrayElementsFunction.h" + +using namespace facebook::velox::test; + +namespace facebook::velox::functions::sparksql::test { +namespace { +std::optional>>> O( + const std::vector>>& vector) { + return std::make_optional(vector); +} + +class MapFromEntriesTest : public SparkFunctionBaseTest { + protected: + /// Create an MAP vector of size 1 using specified 'keys' and 'values' vector. + VectorPtr makeSingleRowMapVector( + const VectorPtr& keys, + const VectorPtr& values) { + BufferPtr offsets = allocateOffsets(1, pool()); + BufferPtr sizes = allocateSizes(1, pool()); + sizes->asMutable()[0] = keys->size(); + + return std::make_shared( + pool(), + MAP(keys->type(), values->type()), + nullptr, + 1, + offsets, + sizes, + keys, + values); + } + + void verifyMapFromEntries( + const std::vector& input, + const VectorPtr& expected, + const std::string& funcArg = "C0") { + const std::string expr = fmt::format("map_from_entries({})", funcArg); + auto result = evaluate(expr, makeRowVector(input)); + assertEqualVectors(expected, result); + } + + // Evaluate an expression only, usually expect error thrown. + void evaluateExpr( + const std::string& expression, + const std::vector& input) { + evaluate(expression, makeRowVector(input)); + } +}; +} // namespace + +TEST_F(MapFromEntriesTest, intKeyAndVarcharValue) { + auto rowType = ROW({INTEGER(), VARCHAR()}); + std::vector>>> + data = { + {{{1, "red"}}, {{2, "blue"}}, {{3, "green"}}}, + }; + auto input = makeArrayOfRowVector(data, rowType); + auto expected = makeMapVector( + {{{1, "red"_sv}, {2, "blue"_sv}, {3, "green"_sv}}}); + verifyMapFromEntries({input}, expected); +} + +TEST_F(MapFromEntriesTest, nullMapEntries) { + auto rowType = ROW({INTEGER(), INTEGER()}); + { + std::vector>>> data = + { + {std::nullopt}, + {{{1, 11}}}, + }; + auto input = makeArrayOfRowVector(data, rowType); + auto expected = + makeNullableMapVector({std::nullopt, O({{1, 11}})}); + verifyMapFromEntries({input}, expected, "C0"); + } + { + // Create array(row(a,b)) where a, b sizes are 0 because all row(a, b) + // values are null. + std::vector>>> data = + { + {std::nullopt, std::nullopt, std::nullopt}, + {std::nullopt}, + }; + auto input = makeArrayOfRowVector(data, rowType); + auto rowInput = input->as(); + rowInput->elements()->as()->childAt(0)->resize(0); + rowInput->elements()->as()->childAt(1)->resize(0); + + auto expected = + makeNullableMapVector({std::nullopt, std::nullopt}); + verifyMapFromEntries({input}, expected, "C0"); + } +} + +TEST_F(MapFromEntriesTest, nullKeys) { + auto rowType = ROW({INTEGER(), INTEGER()}); + std::vector> data = { + {variant::row({variant::null(TypeKind::INTEGER), 0})}, + {variant::row({1, 11})}}; + auto input = makeArrayOfRowVector(rowType, data); + VELOX_ASSERT_THROW( + evaluateExpr("map_from_entries(C0)", {input}), "map key cannot be null"); +} + +TEST_F(MapFromEntriesTest, duplicateKeys) { + auto rowType = ROW({INTEGER(), INTEGER()}); + std::vector>>> data = { + {{{1, 10}}, {{1, 11}}}, + {{{2, 22}}}, + }; + auto input = makeArrayOfRowVector(data, rowType); + VELOX_ASSERT_THROW( + evaluateExpr("map_from_entries(C0)", {input}), + "Duplicate map keys (1) are not allowed"); +} + +TEST_F(MapFromEntriesTest, nullValues) { + auto rowType = ROW({INTEGER(), INTEGER()}); + std::vector> data = { + {variant::row({1, variant::null(TypeKind::INTEGER)}), + variant::row({2, 22}), + variant::row({3, 33})}}; + auto input = makeArrayOfRowVector(rowType, data); + auto expected = + makeMapVector({{{1, std::nullopt}, {2, 22}, {3, 33}}}); + verifyMapFromEntries({input}, expected); +} + +TEST_F(MapFromEntriesTest, constant) { + const vector_size_t kConstantSize = 1'000; + auto rowType = ROW({VARCHAR(), INTEGER()}); + std::vector>>> + data = { + {{{"red", 1}}, {{"blue", 2}}, {{"green", 3}}}, + {{{"red shiny car ahead", 4}}, {{"blue clear sky above", 5}}}, + {{{"r", 11}}, {{"g", 22}}, {{"b", 33}}}, + }; + auto input = makeArrayOfRowVector(data, rowType); + + auto evaluateConstant = [&](vector_size_t row, const VectorPtr& vector) { + return evaluate( + "map_from_entries(C0)", + makeRowVector( + {BaseVector::wrapInConstant(kConstantSize, row, vector)})); + }; + + auto result = evaluateConstant(0, input); + auto expected = BaseVector::wrapInConstant( + kConstantSize, + 0, + makeSingleRowMapVector( + makeFlatVector({"red"_sv, "blue"_sv, "green"_sv}), + makeFlatVector({1, 2, 3}))); + assertEqualVectors(expected, result); + + result = evaluateConstant(1, input); + expected = BaseVector::wrapInConstant( + kConstantSize, + 0, + makeSingleRowMapVector( + makeFlatVector( + {"red shiny car ahead"_sv, "blue clear sky above"_sv}), + makeFlatVector({4, 5}))); + assertEqualVectors(expected, result); + + result = evaluateConstant(2, input); + expected = BaseVector::wrapInConstant( + kConstantSize, + 0, + makeSingleRowMapVector( + makeFlatVector({"r"_sv, "g"_sv, "b"_sv}), + makeFlatVector({11, 22, 33}))); + assertEqualVectors(expected, result); +} + +TEST_F(MapFromEntriesTest, dictionaryEncodedElementsInFlat) { + exec::registerVectorFunction( + "testing_dictionary_array_elements", + facebook::velox::test::TestingDictionaryArrayElementsFunction:: + signatures(), + std::make_unique< + facebook::velox::test::TestingDictionaryArrayElementsFunction>()); + + auto rowType = ROW({INTEGER(), VARCHAR()}); + std::vector>>> + data = { + {{{1, "red"}}, {{2, "blue"}}, {{3, "green"}}}, + }; + auto input = makeArrayOfRowVector(data, rowType); + auto expected = makeMapVector( + {{{1, "red"_sv}, {2, "blue"_sv}, {3, "green"_sv}}}); + verifyMapFromEntries( + {input}, expected, "testing_dictionary_array_elements(C0)"); +} + +TEST_F(MapFromEntriesTest, outputSizeIsBoundBySelectedRows) { + // This test makes sure that map_from_entries output vector size is + // `rows.end()` instead of `rows.size()`. + + auto rowType = ROW({INTEGER(), INTEGER()}); + core::QueryConfig config({}); + auto function = + exec::getVectorFunction("map_from_entries", {ARRAY(rowType)}, {}, config); + + std::vector>>> data = { + {{{1, 11}}, {{2, 22}}, {{3, 33}}}, + {{{4, 44}}, {{5, 55}}}, + {{{6, 66}}}, + }; + auto array = makeArrayOfRowVector(data, rowType); + + auto rowVector = makeRowVector({array}); + + // Only the first 2 rows selected. + SelectivityVector rows(2); + // This is larger than input array size but rows beyond the input vector size + // are not selected. + rows.resize(1000, false); + + ASSERT_EQ(rows.size(), 1000); + ASSERT_EQ(rows.end(), 2); + ASSERT_EQ(array->size(), 3); + + auto typedExpr = + makeTypedExpr("map_from_entries(c0)", asRowType(rowVector->type())); + std::vector results(1); + + exec::ExprSet exprSet({typedExpr}, &execCtx_); + exec::EvalCtx evalCtx(&execCtx_, &exprSet, rowVector.get()); + exprSet.eval(rows, evalCtx, results); + + ASSERT_EQ(results[0]->size(), 2); +} + +TEST_F(MapFromEntriesTest, rowsWithNullsNotPassedToCheckDuplicateKey) { + auto innerRowVector = makeRowVector( + {makeNullableFlatVector({std::nullopt, 2, 3, 4}), + makeNullableFlatVector({1, 2, 3, 4})}); + + auto offsets = makeIndices({0, 2}); + auto sizes = makeIndices({2, 2}); + + auto arrayVector = std::make_shared( + pool(), + ARRAY(ROW({INTEGER(), INTEGER()})), + nullptr, + 2, + offsets, + sizes, + innerRowVector); + VELOX_ASSERT_THROW( + evaluate("map_from_entries(C0)", makeRowVector({arrayVector})), + "map key cannot be null"); +} + +TEST_F(MapFromEntriesTest, arrayOfDictionaryRowOfNulls) { + RowVectorPtr rowVector = + makeRowVector({makeFlatVector(0), makeFlatVector(0)}); + rowVector->resize(4); + rowVector->childAt(0)->resize(0); + rowVector->childAt(1)->resize(0); + for (int i = 0; i < rowVector->size(); i++) { + rowVector->setNull(i, true); + } + + EXPECT_EQ(rowVector->childAt(0)->size(), 0); + EXPECT_EQ(rowVector->childAt(1)->size(), 0); + + auto indices = makeIndices({0, 1, 2, 3}); + + auto dictionary = + BaseVector::wrapInDictionary(nullptr, indices, 4, rowVector); + + auto offsets = makeIndices({0, 2}); + auto sizes = makeIndices({2, 2}); + + auto arrayVector = std::make_shared( + pool(), + ARRAY(ROW({INTEGER(), INTEGER()})), + nullptr, + 2, + offsets, + sizes, + dictionary); + VectorPtr result = + evaluate("map_from_entries(c0)", makeRowVector({arrayVector})); + for (int i = 0; i < result->size(); i++) { + EXPECT_TRUE(result->isNullAt(i)); + } +} + +TEST_F(MapFromEntriesTest, arrayOfConstantRowOfNulls) { + RowVectorPtr rowVector = + makeRowVector({makeFlatVector(0), makeFlatVector(0)}); + rowVector->resize(1); + rowVector->setNull(0, true); + rowVector->childAt(0)->resize(0); + rowVector->childAt(1)->resize(0); + EXPECT_EQ(rowVector->childAt(0)->size(), 0); + EXPECT_EQ(rowVector->childAt(1)->size(), 0); + + VectorPtr rowVectorConstant = BaseVector::wrapInConstant(4, 0, rowVector); + + auto offsets = makeIndices({0, 2}); + auto sizes = makeIndices({2, 2}); + + auto arrayVector = std::make_shared( + pool(), + ARRAY(ROW({INTEGER(), INTEGER()})), + nullptr, + 2, + offsets, + sizes, + rowVectorConstant); + VectorPtr result = + evaluate("map_from_entries(c0)", makeRowVector({arrayVector})); + for (int i = 0; i < result->size(); i++) { + EXPECT_TRUE(result->isNullAt(i)); + } +} + +TEST_F(MapFromEntriesTest, arrayOfConstantNotNulls) { + RowVectorPtr rowVector = makeRowVector( + {makeFlatVector({1, 2}), makeFlatVector({3, 4})}); + rowVector->resize(1); + rowVector->setNull(0, false); + + VectorPtr rowVectorConstant = BaseVector::wrapInConstant(4, 0, rowVector); + { + auto offsets = makeIndices({0, 2}); + auto sizes = makeIndices({2, 2}); + + auto arrayVector = std::make_shared( + pool(), + ARRAY(ROW({INTEGER(), INTEGER()})), + nullptr, + 2, + offsets, + sizes, + rowVectorConstant); + + VELOX_ASSERT_THROW( + evaluate("map_from_entries(C0)", makeRowVector({arrayVector})), + "Duplicate map keys (1) are not allowed"); + } + + { + auto offsets = makeIndices({0, 1}); + auto sizes = makeIndices({1, 1}); + + auto arrayVector = std::make_shared( + pool(), + ARRAY(ROW({INTEGER(), INTEGER()})), + nullptr, + 2, + offsets, + sizes, + rowVectorConstant); + + // will fail due to duplicate key. + VectorPtr result = + evaluate("map_from_entries(c0)", makeRowVector({arrayVector})); + auto expected = makeMapVector( + {{{1, 2}}, {{1, 2}}, {{1, 2}}, {{1, 2}}}); + } +} + +TEST_F(MapFromEntriesTest, nestedNullInKeys) { + facebook::velox::functions::registerArrayConstructor("array_constructor"); + VELOX_ASSERT_THROW( + evaluate( + "map_from_entries(array_constructor(row_constructor(array_constructor(null), null)))", + makeRowVector({makeFlatVector(1)})), + "map key cannot be indeterminate"); +} + +TEST_F(MapFromEntriesTest, unknownInputs) { + facebook::velox::functions::registerArrayConstructor("array_constructor"); + auto expectedType = MAP(UNKNOWN(), UNKNOWN()); + auto test = [&](const std::string& query) { + auto result = evaluate(query, makeRowVector({makeFlatVector(2)})); + ASSERT_TRUE(result->type()->equivalent(*expectedType)); + }; + VELOX_ASSERT_THROW( + evaluate( + "map_from_entries(array_constructor(row_constructor(null, null)))", + makeRowVector({makeFlatVector(2)})), + "map key cannot be null"); + test("map_from_entries(array_constructor(null))"); + test("map_from_entries(null)"); +} + +TEST_F(MapFromEntriesTest, nullRowEntriesWithSmallerChildren) { + // Row vector is of size 3, childrens are of size 2 since row 2 is null. + auto rowVector = makeRowVector( + {makeNullableFlatVector({std::nullopt, 2}), + makeFlatVector({1, 2})}); + rowVector->appendNulls(1); + rowVector->setNull(2, true); + + // Array [(null,1), (2,2), null] + auto arrayVector = makeArrayVector({0}, rowVector); + VELOX_ASSERT_THROW( + evaluate("map_from_entries(c0)", makeRowVector({arrayVector})), + "map key cannot be null"); +} + +TEST_F(MapFromEntriesTest, allTopLevelNullsCornerCase) { + // Test a corner case where the input of mapFromEntries is an array with + // array at row 0 : [(null), (null)] + // array at row 1 : [] + // And the function is evaluated only at row 1. + + auto keys = makeNullableFlatVector({}); + auto values = makeNullableFlatVector({}); + auto rowVector = makeRowVector({keys, values}); + + EXPECT_EQ(rowVector->size(), 0); + rowVector->appendNulls(2); + + EXPECT_EQ(rowVector->size(), 2); + EXPECT_EQ(rowVector->childAt(0)->size(), 0); + EXPECT_EQ(rowVector->childAt(1)->size(), 0); + + // Array at row 0 is [(null), (null)] + // Array at row 1 is [] + auto arrayVector = makeArrayVector({0, 2}, rowVector); + + SelectivityVector rows(2); + rows.setValid(0, false); + + auto result = + evaluate("map_from_entries(c0)", makeRowVector({arrayVector}), rows); + result->validate(); + auto expected = makeMapVectorFromJson({"{}", "{}"}); + assertEqualVectors(expected, result); +} +} // namespace facebook::velox::functions::sparksql::test