diff --git a/velox/docs/functions/spark/map.rst b/velox/docs/functions/spark/map.rst
index 1a995eca5c03c..13842847271f1 100644
--- a/velox/docs/functions/spark/map.rst
+++ b/velox/docs/functions/spark/map.rst
@@ -27,6 +27,13 @@ Map Functions
 
         SELECT map_from_arrays(array(1.0, 3.0), array('2', '4')); -- {1.0 -> 2, 3.0 -> 4}
 
+.. spark:function:: map_from_entries(struct(K,V)) -> map(K,V)
+
+    Converts an array of entries (key value struct types) to a map of values. All elements in keys should not be null.
+    If null entry exists in the array, return null for this whole array.::
+
+        SELECT map_from_entries(array(struct(1, 'a'), struct(2, 'b'))); -- {1 -> 'a', 2 -> 'b'}
+
 .. spark:function:: size(map(K,V)) -> bigint
    :noindex:
 
diff --git a/velox/functions/sparksql/CMakeLists.txt b/velox/functions/sparksql/CMakeLists.txt
index 0522cbfefab5f..6177057552ee8 100644
--- a/velox/functions/sparksql/CMakeLists.txt
+++ b/velox/functions/sparksql/CMakeLists.txt
@@ -24,6 +24,7 @@ add_library(
   In.cpp
   LeastGreatest.cpp
   Map.cpp
+  MapFromEntries.cpp
   RegexFunctions.cpp
   Register.cpp
   RegisterArithmetic.cpp
diff --git a/velox/functions/sparksql/MapFromEntries.cpp b/velox/functions/sparksql/MapFromEntries.cpp
new file mode 100644
index 0000000000000..ee7f0075c6fb8
--- /dev/null
+++ b/velox/functions/sparksql/MapFromEntries.cpp
@@ -0,0 +1,230 @@
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+
+#include "velox/expression/EvalCtx.h"
+#include "velox/expression/Expr.h"
+#include "velox/expression/VectorFunction.h"
+#include "velox/functions/lib/CheckDuplicateKeys.h"
+#include "velox/functions/lib/RowsTranslationUtil.h"
+#include "velox/vector/BaseVector.h"
+#include "velox/vector/ComplexVector.h"
+
+namespace facebook::velox::functions {
+namespace {
+static const char* kNullKeyErrorMessage = "map key cannot be null";
+static const char* kIndeterminateKeyErrorMessage =
+    "map key cannot be indeterminate";
+
+class MapFromEntriesFunction : public exec::VectorFunction {
+ public:
+  void apply(
+      const SelectivityVector& rows,
+      std::vector<VectorPtr>& args,
+      const TypePtr& outputType,
+      exec::EvalCtx& context,
+      VectorPtr& result) const override {
+    VELOX_CHECK_EQ(args.size(), 1);
+    auto& arg = args[0];
+    VectorPtr localResult;
+    // Input can be constant or flat.
+    if (arg->isConstantEncoding()) {
+      auto* constantArray = arg->as<ConstantVector<ComplexType>>();
+      const auto& flatArray = constantArray->valueVector();
+      const auto flatIndex = constantArray->index();
+
+      exec::LocalSelectivityVector singleRow(context, flatIndex + 1);
+      singleRow->clearAll();
+      singleRow->setValid(flatIndex, true);
+      singleRow->updateBounds();
+
+      localResult = applyFlat(
+          *singleRow.get(), flatArray->as<ArrayVector>(), outputType, context);
+      localResult =
+          BaseVector::wrapInConstant(rows.size(), flatIndex, localResult);
+    } else {
+      localResult =
+          applyFlat(rows, arg->as<ArrayVector>(), outputType, context);
+    }
+
+    context.moveOrCopyResult(localResult, rows, result);
+  }
+
+  static std::vector<std::shared_ptr<exec::FunctionSignature>> signatures() {
+    return {// unknown -> map(unknown, unknown)
+            exec::FunctionSignatureBuilder()
+                .returnType("map(unknown, unknown)")
+                .argumentType("unknown")
+                .build(),
+            // array(unknown) -> map(unknown, unknown)
+            exec::FunctionSignatureBuilder()
+                .returnType("map(unknown, unknown)")
+                .argumentType("array(unknown)")
+                .build(),
+            // array(row(K,V)) -> map(K,V)
+            exec::FunctionSignatureBuilder()
+                .typeVariable("K")
+                .typeVariable("V")
+                .returnType("map(K,V)")
+                .argumentType("array(row(K,V))")
+                .build()};
+  }
+
+ private:
+  VectorPtr applyFlat(
+      const SelectivityVector& rows,
+      const ArrayVector* inputArray,
+      const TypePtr& outputType,
+      exec::EvalCtx& context) const {
+    auto& inputValueVector = inputArray->elements();
+    exec::LocalDecodedVector decodedRowVector(context);
+    decodedRowVector.get()->decode(*inputValueVector);
+    if (inputValueVector->typeKind() == TypeKind::UNKNOWN) {
+      auto sizes = allocateSizes(rows.end(), context.pool());
+      auto offsets = allocateSizes(rows.end(), context.pool());
+
+      // Output in this case is map(unknown, unknown), but all elements are
+      // nulls, all offsets and sizes are 0.
+      return std::make_shared<MapVector>(
+          context.pool(),
+          outputType,
+          inputArray->nulls(),
+          rows.end(),
+          sizes,
+          offsets,
+          BaseVector::create(UNKNOWN(), 0, context.pool()),
+          BaseVector::create(UNKNOWN(), 0, context.pool()));
+    }
+
+    exec::LocalSelectivityVector remianingRows(context, rows);
+    auto rowVector = decodedRowVector->base()->as<RowVector>();
+    auto keyVector = rowVector->childAt(0);
+
+    BufferPtr sizes = allocateSizes(rows.end(), context.pool());
+    vector_size_t* mutableSizes = sizes->asMutable<vector_size_t>();
+    rows.applyToSelected([&](vector_size_t row) {
+      mutableSizes[row] = inputArray->rawSizes()[row];
+    });
+
+    auto resetSize = [&](vector_size_t row) { mutableSizes[row] = 0; };
+    auto nulls = allocateNulls(decodedRowVector->size(), context.pool());
+    auto* mutableNulls = nulls->asMutable<uint64_t>();
+
+    if (decodedRowVector->mayHaveNulls() || keyVector->mayHaveNulls() ||
+        keyVector->mayHaveNullsRecursive()) {
+      context.applyToSelectedNoThrow(rows, [&](vector_size_t row) {
+        const auto size = inputArray->sizeAt(row);
+        const auto offset = inputArray->offsetAt(row);
+
+        for (auto i = 0; i < size; ++i) {
+          // For nulls in the top level row vector, return null.
+          const bool isMapEntryNull = decodedRowVector->isNullAt(offset + i);
+          if (isMapEntryNull) {
+            bits::setNull(mutableNulls, row);
+            break;
+          }
+
+          // Check null keys.
+          auto keyIndex = decodedRowVector->index(offset + i);
+          if (keyVector->isNullAt(keyIndex)) {
+            resetSize(row);
+            VELOX_USER_FAIL(kNullKeyErrorMessage);
+          }
+
+          // Check nested null in keys.
+          if (keyVector->containsNullAt(keyIndex)) {
+            resetSize(row);
+            VELOX_USER_FAIL(fmt::format(
+                "{}: {}",
+                kIndeterminateKeyErrorMessage,
+                keyVector->toString(keyIndex)));
+          }
+        }
+      });
+    }
+
+    context.deselectErrors(*remianingRows.get());
+
+    VectorPtr wrappedKeys;
+    VectorPtr wrappedValues;
+    if (decodedRowVector->isIdentityMapping()) {
+      wrappedKeys = rowVector->childAt(0);
+      wrappedValues = rowVector->childAt(1);
+    } else if (decodedRowVector->isConstantMapping()) {
+      if (decodedRowVector->isNullAt(0)) {
+        // If top level row is null, child might not be addressable at index 0
+        // so we do not try to read it.
+        wrappedKeys = BaseVector::createNullConstant(
+            rowVector->childAt(0)->type(),
+            decodedRowVector->size(),
+            context.pool());
+        wrappedValues = BaseVector::createNullConstant(
+            rowVector->childAt(1)->type(),
+            decodedRowVector->size(),
+            context.pool());
+      } else {
+        wrappedKeys = BaseVector::wrapInConstant(
+            decodedRowVector->size(),
+            decodedRowVector->index(0),
+            rowVector->childAt(0));
+        wrappedValues = BaseVector::wrapInConstant(
+            decodedRowVector->size(),
+            decodedRowVector->index(0),
+            rowVector->childAt(1));
+      }
+    } else {
+      // Dictionary.
+      auto indices = allocateIndices(decodedRowVector->size(), context.pool());
+      memcpy(
+          indices->asMutable<vector_size_t>(),
+          decodedRowVector->indices(),
+          BaseVector::byteSize<vector_size_t>(decodedRowVector->size()));
+      // Any null in the top row(X, Y) should be marked as null since its
+      // not guranteed to be addressable at X or Y.
+      for (auto i = 0; i < decodedRowVector->size(); i++) {
+        if (decodedRowVector->isNullAt(i)) {
+          bits::setNull(mutableNulls, i);
+        }
+      }
+      wrappedKeys = BaseVector::wrapInDictionary(
+          nulls, indices, decodedRowVector->size(), rowVector->childAt(0));
+      wrappedValues = BaseVector::wrapInDictionary(
+          nulls, indices, decodedRowVector->size(), rowVector->childAt(1));
+    }
+
+    // To avoid creating new buffers, we try to reuse the input's buffers
+    // as many as possible.
+    auto mapVector = std::make_shared<MapVector>(
+        context.pool(),
+        outputType,
+        nulls,
+        rows.end(),
+        inputArray->offsets(),
+        sizes,
+        wrappedKeys,
+        wrappedValues);
+
+    checkDuplicateKeys(mapVector, *remianingRows, context);
+    return mapVector;
+  }
+};
+} // namespace
+
+VELOX_DECLARE_VECTOR_FUNCTION(
+    udf_map_from_entries,
+    MapFromEntriesFunction::signatures(),
+    std::make_unique<MapFromEntriesFunction>());
+} // namespace facebook::velox::functions
diff --git a/velox/functions/sparksql/Register.cpp b/velox/functions/sparksql/Register.cpp
index 74d9c81e7f49d..145dceeb22322 100644
--- a/velox/functions/sparksql/Register.cpp
+++ b/velox/functions/sparksql/Register.cpp
@@ -69,6 +69,8 @@ static void workAroundRegistrationMacro(const std::string& prefix) {
 
   VELOX_REGISTER_VECTOR_FUNCTION(
       udf_map_allow_duplicates, prefix + "map_from_arrays");
+  VELOX_REGISTER_VECTOR_FUNCTION(
+      udf_map_from_entries, prefix + "map_from_entries");
   VELOX_REGISTER_VECTOR_FUNCTION(
       udf_concat_row, exec::RowConstructorCallToSpecialForm::kRowConstructor);
   // String functions.
diff --git a/velox/functions/sparksql/tests/CMakeLists.txt b/velox/functions/sparksql/tests/CMakeLists.txt
index 86d4d1fa5d52e..8757feafbe155 100644
--- a/velox/functions/sparksql/tests/CMakeLists.txt
+++ b/velox/functions/sparksql/tests/CMakeLists.txt
@@ -31,6 +31,7 @@ add_executable(
   InTest.cpp
   LeastGreatestTest.cpp
   MapTest.cpp
+  MapFromEntriesTest.cpp
   MightContainTest.cpp
   RandTest.cpp
   RegexFunctionsTest.cpp
diff --git a/velox/functions/sparksql/tests/MapFromEntriesTest.cpp b/velox/functions/sparksql/tests/MapFromEntriesTest.cpp
new file mode 100644
index 0000000000000..2702ab0a60ddd
--- /dev/null
+++ b/velox/functions/sparksql/tests/MapFromEntriesTest.cpp
@@ -0,0 +1,459 @@
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <cstdint>
+#include <optional>
+#include "velox/common/base/tests/GTestUtils.h"
+#include "velox/functions/lib/CheckDuplicateKeys.h"
+#include "velox/functions/prestosql/ArrayConstructor.h"
+#include "velox/functions/sparksql/tests/SparkFunctionBaseTest.h"
+#include "velox/vector/tests/TestingDictionaryArrayElementsFunction.h"
+
+using namespace facebook::velox::test;
+
+namespace facebook::velox::functions::sparksql::test {
+namespace {
+std::optional<std::vector<std::pair<int32_t, std::optional<int32_t>>>> O(
+    const std::vector<std::pair<int32_t, std::optional<int32_t>>>& vector) {
+  return std::make_optional(vector);
+}
+
+class MapFromEntriesTest : public SparkFunctionBaseTest {
+ protected:
+  /// Create an MAP vector of size 1 using specified 'keys' and 'values' vector.
+  VectorPtr makeSingleRowMapVector(
+      const VectorPtr& keys,
+      const VectorPtr& values) {
+    BufferPtr offsets = allocateOffsets(1, pool());
+    BufferPtr sizes = allocateSizes(1, pool());
+    sizes->asMutable<vector_size_t>()[0] = keys->size();
+
+    return std::make_shared<MapVector>(
+        pool(),
+        MAP(keys->type(), values->type()),
+        nullptr,
+        1,
+        offsets,
+        sizes,
+        keys,
+        values);
+  }
+
+  void verifyMapFromEntries(
+      const std::vector<VectorPtr>& input,
+      const VectorPtr& expected,
+      const std::string& funcArg = "C0") {
+    const std::string expr = fmt::format("map_from_entries({})", funcArg);
+    auto result = evaluate<MapVector>(expr, makeRowVector(input));
+    assertEqualVectors(expected, result);
+  }
+
+  // Evaluate an expression only, usually expect error thrown.
+  void evaluateExpr(
+      const std::string& expression,
+      const std::vector<VectorPtr>& input) {
+    evaluate(expression, makeRowVector(input));
+  }
+};
+} // namespace
+
+TEST_F(MapFromEntriesTest, intKeyAndVarcharValue) {
+  auto rowType = ROW({INTEGER(), VARCHAR()});
+  std::vector<std::vector<std::optional<std::tuple<int32_t, std::string>>>>
+      data = {
+          {{{1, "red"}}, {{2, "blue"}}, {{3, "green"}}},
+      };
+  auto input = makeArrayOfRowVector(data, rowType);
+  auto expected = makeMapVector<int32_t, StringView>(
+      {{{1, "red"_sv}, {2, "blue"_sv}, {3, "green"_sv}}});
+  verifyMapFromEntries({input}, expected);
+}
+
+TEST_F(MapFromEntriesTest, nullMapEntries) {
+  auto rowType = ROW({INTEGER(), INTEGER()});
+  {
+    std::vector<std::vector<std::optional<std::tuple<int32_t, int32_t>>>> data =
+        {
+            {std::nullopt},
+            {{{1, 11}}},
+        };
+    auto input = makeArrayOfRowVector(data, rowType);
+    auto expected =
+        makeNullableMapVector<int32_t, int32_t>({std::nullopt, O({{1, 11}})});
+    verifyMapFromEntries({input}, expected, "C0");
+  }
+  {
+    // Create array(row(a,b)) where a, b sizes are 0 because all row(a, b)
+    // values are null.
+    std::vector<std::vector<std::optional<std::tuple<int32_t, int32_t>>>> data =
+        {
+            {std::nullopt, std::nullopt, std::nullopt},
+            {std::nullopt},
+        };
+    auto input = makeArrayOfRowVector(data, rowType);
+    auto rowInput = input->as<ArrayVector>();
+    rowInput->elements()->as<RowVector>()->childAt(0)->resize(0);
+    rowInput->elements()->as<RowVector>()->childAt(1)->resize(0);
+
+    auto expected =
+        makeNullableMapVector<int32_t, int32_t>({std::nullopt, std::nullopt});
+    verifyMapFromEntries({input}, expected, "C0");
+  }
+}
+
+TEST_F(MapFromEntriesTest, nullKeys) {
+  auto rowType = ROW({INTEGER(), INTEGER()});
+  std::vector<std::vector<variant>> data = {
+      {variant::row({variant::null(TypeKind::INTEGER), 0})},
+      {variant::row({1, 11})}};
+  auto input = makeArrayOfRowVector(rowType, data);
+  VELOX_ASSERT_THROW(
+      evaluateExpr("map_from_entries(C0)", {input}), "map key cannot be null");
+}
+
+TEST_F(MapFromEntriesTest, duplicateKeys) {
+  auto rowType = ROW({INTEGER(), INTEGER()});
+  std::vector<std::vector<std::optional<std::tuple<int32_t, int32_t>>>> data = {
+      {{{1, 10}}, {{1, 11}}},
+      {{{2, 22}}},
+  };
+  auto input = makeArrayOfRowVector(data, rowType);
+  VELOX_ASSERT_THROW(
+      evaluateExpr("map_from_entries(C0)", {input}),
+      "Duplicate map keys (1) are not allowed");
+}
+
+TEST_F(MapFromEntriesTest, nullValues) {
+  auto rowType = ROW({INTEGER(), INTEGER()});
+  std::vector<std::vector<variant>> data = {
+      {variant::row({1, variant::null(TypeKind::INTEGER)}),
+       variant::row({2, 22}),
+       variant::row({3, 33})}};
+  auto input = makeArrayOfRowVector(rowType, data);
+  auto expected =
+      makeMapVector<int32_t, int32_t>({{{1, std::nullopt}, {2, 22}, {3, 33}}});
+  verifyMapFromEntries({input}, expected);
+}
+
+TEST_F(MapFromEntriesTest, constant) {
+  const vector_size_t kConstantSize = 1'000;
+  auto rowType = ROW({VARCHAR(), INTEGER()});
+  std::vector<std::vector<std::optional<std::tuple<std::string, int32_t>>>>
+      data = {
+          {{{"red", 1}}, {{"blue", 2}}, {{"green", 3}}},
+          {{{"red shiny car ahead", 4}}, {{"blue clear sky above", 5}}},
+          {{{"r", 11}}, {{"g", 22}}, {{"b", 33}}},
+      };
+  auto input = makeArrayOfRowVector(data, rowType);
+
+  auto evaluateConstant = [&](vector_size_t row, const VectorPtr& vector) {
+    return evaluate(
+        "map_from_entries(C0)",
+        makeRowVector(
+            {BaseVector::wrapInConstant(kConstantSize, row, vector)}));
+  };
+
+  auto result = evaluateConstant(0, input);
+  auto expected = BaseVector::wrapInConstant(
+      kConstantSize,
+      0,
+      makeSingleRowMapVector(
+          makeFlatVector<StringView>({"red"_sv, "blue"_sv, "green"_sv}),
+          makeFlatVector<int32_t>({1, 2, 3})));
+  assertEqualVectors(expected, result);
+
+  result = evaluateConstant(1, input);
+  expected = BaseVector::wrapInConstant(
+      kConstantSize,
+      0,
+      makeSingleRowMapVector(
+          makeFlatVector<StringView>(
+              {"red shiny car ahead"_sv, "blue clear sky above"_sv}),
+          makeFlatVector<int32_t>({4, 5})));
+  assertEqualVectors(expected, result);
+
+  result = evaluateConstant(2, input);
+  expected = BaseVector::wrapInConstant(
+      kConstantSize,
+      0,
+      makeSingleRowMapVector(
+          makeFlatVector<StringView>({"r"_sv, "g"_sv, "b"_sv}),
+          makeFlatVector<int32_t>({11, 22, 33})));
+  assertEqualVectors(expected, result);
+}
+
+TEST_F(MapFromEntriesTest, dictionaryEncodedElementsInFlat) {
+  exec::registerVectorFunction(
+      "testing_dictionary_array_elements",
+      facebook::velox::test::TestingDictionaryArrayElementsFunction::
+          signatures(),
+      std::make_unique<
+          facebook::velox::test::TestingDictionaryArrayElementsFunction>());
+
+  auto rowType = ROW({INTEGER(), VARCHAR()});
+  std::vector<std::vector<std::optional<std::tuple<int32_t, std::string>>>>
+      data = {
+          {{{1, "red"}}, {{2, "blue"}}, {{3, "green"}}},
+      };
+  auto input = makeArrayOfRowVector(data, rowType);
+  auto expected = makeMapVector<int32_t, StringView>(
+      {{{1, "red"_sv}, {2, "blue"_sv}, {3, "green"_sv}}});
+  verifyMapFromEntries(
+      {input}, expected, "testing_dictionary_array_elements(C0)");
+}
+
+TEST_F(MapFromEntriesTest, outputSizeIsBoundBySelectedRows) {
+  // This test makes sure that map_from_entries output vector size is
+  // `rows.end()` instead of `rows.size()`.
+
+  auto rowType = ROW({INTEGER(), INTEGER()});
+  core::QueryConfig config({});
+  auto function =
+      exec::getVectorFunction("map_from_entries", {ARRAY(rowType)}, {}, config);
+
+  std::vector<std::vector<std::optional<std::tuple<int32_t, int32_t>>>> data = {
+      {{{1, 11}}, {{2, 22}}, {{3, 33}}},
+      {{{4, 44}}, {{5, 55}}},
+      {{{6, 66}}},
+  };
+  auto array = makeArrayOfRowVector(data, rowType);
+
+  auto rowVector = makeRowVector({array});
+
+  // Only the first 2 rows selected.
+  SelectivityVector rows(2);
+  // This is larger than input array size but rows beyond the input vector size
+  // are not selected.
+  rows.resize(1000, false);
+
+  ASSERT_EQ(rows.size(), 1000);
+  ASSERT_EQ(rows.end(), 2);
+  ASSERT_EQ(array->size(), 3);
+
+  auto typedExpr =
+      makeTypedExpr("map_from_entries(c0)", asRowType(rowVector->type()));
+  std::vector<VectorPtr> results(1);
+
+  exec::ExprSet exprSet({typedExpr}, &execCtx_);
+  exec::EvalCtx evalCtx(&execCtx_, &exprSet, rowVector.get());
+  exprSet.eval(rows, evalCtx, results);
+
+  ASSERT_EQ(results[0]->size(), 2);
+}
+
+TEST_F(MapFromEntriesTest, rowsWithNullsNotPassedToCheckDuplicateKey) {
+  auto innerRowVector = makeRowVector(
+      {makeNullableFlatVector<int32_t>({std::nullopt, 2, 3, 4}),
+       makeNullableFlatVector<int32_t>({1, 2, 3, 4})});
+
+  auto offsets = makeIndices({0, 2});
+  auto sizes = makeIndices({2, 2});
+
+  auto arrayVector = std::make_shared<ArrayVector>(
+      pool(),
+      ARRAY(ROW({INTEGER(), INTEGER()})),
+      nullptr,
+      2,
+      offsets,
+      sizes,
+      innerRowVector);
+  VELOX_ASSERT_THROW(
+      evaluate("map_from_entries(C0)", makeRowVector({arrayVector})),
+      "map key cannot be null");
+}
+
+TEST_F(MapFromEntriesTest, arrayOfDictionaryRowOfNulls) {
+  RowVectorPtr rowVector =
+      makeRowVector({makeFlatVector<int32_t>(0), makeFlatVector<int32_t>(0)});
+  rowVector->resize(4);
+  rowVector->childAt(0)->resize(0);
+  rowVector->childAt(1)->resize(0);
+  for (int i = 0; i < rowVector->size(); i++) {
+    rowVector->setNull(i, true);
+  }
+
+  EXPECT_EQ(rowVector->childAt(0)->size(), 0);
+  EXPECT_EQ(rowVector->childAt(1)->size(), 0);
+
+  auto indices = makeIndices({0, 1, 2, 3});
+
+  auto dictionary =
+      BaseVector::wrapInDictionary(nullptr, indices, 4, rowVector);
+
+  auto offsets = makeIndices({0, 2});
+  auto sizes = makeIndices({2, 2});
+
+  auto arrayVector = std::make_shared<ArrayVector>(
+      pool(),
+      ARRAY(ROW({INTEGER(), INTEGER()})),
+      nullptr,
+      2,
+      offsets,
+      sizes,
+      dictionary);
+  VectorPtr result =
+      evaluate("map_from_entries(c0)", makeRowVector({arrayVector}));
+  for (int i = 0; i < result->size(); i++) {
+    EXPECT_TRUE(result->isNullAt(i));
+  }
+}
+
+TEST_F(MapFromEntriesTest, arrayOfConstantRowOfNulls) {
+  RowVectorPtr rowVector =
+      makeRowVector({makeFlatVector<int32_t>(0), makeFlatVector<int32_t>(0)});
+  rowVector->resize(1);
+  rowVector->setNull(0, true);
+  rowVector->childAt(0)->resize(0);
+  rowVector->childAt(1)->resize(0);
+  EXPECT_EQ(rowVector->childAt(0)->size(), 0);
+  EXPECT_EQ(rowVector->childAt(1)->size(), 0);
+
+  VectorPtr rowVectorConstant = BaseVector::wrapInConstant(4, 0, rowVector);
+
+  auto offsets = makeIndices({0, 2});
+  auto sizes = makeIndices({2, 2});
+
+  auto arrayVector = std::make_shared<ArrayVector>(
+      pool(),
+      ARRAY(ROW({INTEGER(), INTEGER()})),
+      nullptr,
+      2,
+      offsets,
+      sizes,
+      rowVectorConstant);
+  VectorPtr result =
+      evaluate("map_from_entries(c0)", makeRowVector({arrayVector}));
+  for (int i = 0; i < result->size(); i++) {
+    EXPECT_TRUE(result->isNullAt(i));
+  }
+}
+
+TEST_F(MapFromEntriesTest, arrayOfConstantNotNulls) {
+  RowVectorPtr rowVector = makeRowVector(
+      {makeFlatVector<int32_t>({1, 2}), makeFlatVector<int32_t>({3, 4})});
+  rowVector->resize(1);
+  rowVector->setNull(0, false);
+
+  VectorPtr rowVectorConstant = BaseVector::wrapInConstant(4, 0, rowVector);
+  {
+    auto offsets = makeIndices({0, 2});
+    auto sizes = makeIndices({2, 2});
+
+    auto arrayVector = std::make_shared<ArrayVector>(
+        pool(),
+        ARRAY(ROW({INTEGER(), INTEGER()})),
+        nullptr,
+        2,
+        offsets,
+        sizes,
+        rowVectorConstant);
+
+    VELOX_ASSERT_THROW(
+        evaluate("map_from_entries(C0)", makeRowVector({arrayVector})),
+        "Duplicate map keys (1) are not allowed");
+  }
+
+  {
+    auto offsets = makeIndices({0, 1});
+    auto sizes = makeIndices({1, 1});
+
+    auto arrayVector = std::make_shared<ArrayVector>(
+        pool(),
+        ARRAY(ROW({INTEGER(), INTEGER()})),
+        nullptr,
+        2,
+        offsets,
+        sizes,
+        rowVectorConstant);
+
+    // will fail due to duplicate key.
+    VectorPtr result =
+        evaluate("map_from_entries(c0)", makeRowVector({arrayVector}));
+    auto expected = makeMapVector<int32_t, int32_t>(
+        {{{1, 2}}, {{1, 2}}, {{1, 2}}, {{1, 2}}});
+  }
+}
+
+TEST_F(MapFromEntriesTest, nestedNullInKeys) {
+  facebook::velox::functions::registerArrayConstructor("array_constructor");
+  VELOX_ASSERT_THROW(
+      evaluate(
+          "map_from_entries(array_constructor(row_constructor(array_constructor(null), null)))",
+          makeRowVector({makeFlatVector<int32_t>(1)})),
+      "map key cannot be indeterminate");
+}
+
+TEST_F(MapFromEntriesTest, unknownInputs) {
+  facebook::velox::functions::registerArrayConstructor("array_constructor");
+  auto expectedType = MAP(UNKNOWN(), UNKNOWN());
+  auto test = [&](const std::string& query) {
+    auto result = evaluate(query, makeRowVector({makeFlatVector<int32_t>(2)}));
+    ASSERT_TRUE(result->type()->equivalent(*expectedType));
+  };
+  VELOX_ASSERT_THROW(
+      evaluate(
+          "map_from_entries(array_constructor(row_constructor(null, null)))",
+          makeRowVector({makeFlatVector<int32_t>(2)})),
+      "map key cannot be null");
+  test("map_from_entries(array_constructor(null))");
+  test("map_from_entries(null)");
+}
+
+TEST_F(MapFromEntriesTest, nullRowEntriesWithSmallerChildren) {
+  // Row vector is of size 3, childrens are of size 2 since row 2 is null.
+  auto rowVector = makeRowVector(
+      {makeNullableFlatVector<int32_t>({std::nullopt, 2}),
+       makeFlatVector<int32_t>({1, 2})});
+  rowVector->appendNulls(1);
+  rowVector->setNull(2, true);
+
+  // Array [(null,1), (2,2), null]
+  auto arrayVector = makeArrayVector({0}, rowVector);
+  VELOX_ASSERT_THROW(
+      evaluate("map_from_entries(c0)", makeRowVector({arrayVector})),
+      "map key cannot be null");
+}
+
+TEST_F(MapFromEntriesTest, allTopLevelNullsCornerCase) {
+  // Test a corner case where the input of mapFromEntries is an array with
+  // array at row 0 : [(null), (null)]
+  // array at row 1 : []
+  // And the function is evaluated only at row 1.
+
+  auto keys = makeNullableFlatVector<int32_t>({});
+  auto values = makeNullableFlatVector<int32_t>({});
+  auto rowVector = makeRowVector({keys, values});
+
+  EXPECT_EQ(rowVector->size(), 0);
+  rowVector->appendNulls(2);
+
+  EXPECT_EQ(rowVector->size(), 2);
+  EXPECT_EQ(rowVector->childAt(0)->size(), 0);
+  EXPECT_EQ(rowVector->childAt(1)->size(), 0);
+
+  // Array at row 0 is [(null), (null)]
+  // Array at row 1 is []
+  auto arrayVector = makeArrayVector({0, 2}, rowVector);
+
+  SelectivityVector rows(2);
+  rows.setValid(0, false);
+
+  auto result =
+      evaluate("map_from_entries(c0)", makeRowVector({arrayVector}), rows);
+  result->validate();
+  auto expected = makeMapVectorFromJson<int32_t, int32_t>({"{}", "{}"});
+  assertEqualVectors(expected, result);
+}
+} // namespace facebook::velox::functions::sparksql::test