Skip to content

[FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function. #26777

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,9 @@ collection:
- sql: MAP_FROM_ARRAYS(array_of_keys, array_of_values)
table: mapFromArrays(array_of_keys, array_of_values)
description: Returns a map created from an arrays of keys and values. Note that the lengths of two arrays should be the same.
- sql: MAP_FROM_ENTRIES(array_of_rows)
table: mapFromEntries(array_of_rows)
description: Returns a map created from an array of rows consisting of two fields.
- sql: SPLIT(string, delimiter)
table: string.split(delimiter)
description: Returns an array of substrings by splitting the input string based on the given delimiter. If the delimiter is not found in the string, the original string is returned as the only element in the array. If the delimiter is empty, every character in the string is split. If the string or delimiter is null, a null value is returned. If the delimiter is found at the beginning or end of the string, or there are contiguous delimiters, then an empty string is added to the array.
Expand Down
3 changes: 3 additions & 0 deletions docs/data/sql_functions_zh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,9 @@ collection:
- sql: MAP_FROM_ARRAYS(array_of_keys, array_of_values)
table: mapFromArrays(array_of_keys, array_of_values)
description: 返回由 key 的数组 keys 和 value 的数组 values 创建的 map。请注意两个数组的长度应该相等。
- sql: MAP_FROM_ENTRIES(array_of_rows)
table: mapFromEntries(array_of_rows)
description: Returns a map created from an array of rows consisting of two fields.
- sql: MAP_UNION(map1, map2)
table: map1.mapUnion(map2)
description: 返回一个通过合并两个图 'map1' 和 'map2' 创建的图。这两个图应该具有共同的图类型。如果有重叠的键,'map2' 的值将覆盖 'map1' 的值。如果任一图为空,则返回 null。
Expand Down
1 change: 1 addition & 0 deletions flink-python/docs/reference/pyflink.table/expressions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Expressions
row
map_
map_from_arrays
map_from_entries
row_interval
pi
e
Expand Down
16 changes: 16 additions & 0 deletions flink-python/pyflink/table/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,22 @@ def map_from_arrays(key, value) -> Expression:
return _binary_op("mapFromArrays", key, value)


def map_from_entries(rows) -> Expression:
"""
Creates a map from an array of entries (row with two fields).

Example:
::

>>> tab.select(
>>> map_from_entries(
>>> array(row(key1, 1), row(key2, 2), row(key3, 3))
>>> ))

"""
return _unary_op("mapFromEntries", rows)


@PublicEvolving()
def row_interval(rows: int) -> Expression:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,22 @@ public static ApiExpression mapFromArrays(Object key, Object value) {
objectToExpression(value));
}

/**
* Creates a map from an array of entries (row with two fields).
*
* <pre>{@code
* table.select(
* mapFromEntries(
* array(row(key1, 1), row(key2, 2), row(key3, 3))
Copy link
Contributor

@davidradl davidradl Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering what the constrains are for this

  • what is the behaviour for duplicate keys - this should be documented, I assume we take the first key value or error. We should add tests for this.
  • are there any constraints on the key type - this should be documented and tests added. How would it fail is the key type was not a valid one - for example I would think a nested row is not appropriate for a key, but primitives excluding boolean would seems reasonable.
    • do we check that the rows are of the same shape - we should test how/if this fails and document ?

* ))
* }</pre>
*
* <p>Note If the number of fields in a row array is not 2, an error is returned.
*/
public static ApiExpression mapFromEntries(Object rows) {
return apiCall(BuiltInFunctionDefinitions.MAP_FROM_ENTRIES, objectToExpression(rows));
}

/**
* Creates an interval of rows.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,11 @@ trait ImplicitExpressionConversions {
Expressions.mapFromArrays(key, value)
}

/** Creates a map from an array of entries (row with two fields). */
def mapFromEntries(rows: Expression): Expression = {
Expressions.mapFromEntries(rows)
}

/** Returns a value that is closer than any other value to pi. */
def pi(): Expression = {
Expressions.pi()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.data;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.FlinkRuntimeException;

/** A simple `MapData` implementation which is backed by 2 arrays. */
@PublicEvolving
public class ArrayBasedMapData implements MapData {
private final ArrayData keyArray;
private final ArrayData valueArray;

public ArrayBasedMapData(ArrayData keyArray, ArrayData valueArray) {
if (keyArray.size() != valueArray.size()) {
throw new FlinkRuntimeException(
"Invalid function call:\n"
+ "The length of the keys array "
+ keyArray.size()
+ " is not equal to the length of the values array "
+ valueArray.size());
}
this.keyArray = keyArray;
this.valueArray = valueArray;
}

@Override
public int size() {
return keyArray.size();
}

@Override
public ArrayData keyArray() {
return keyArray;
}

@Override
public ArrayData valueArray() {
return valueArray;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,16 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
"org.apache.flink.table.runtime.functions.scalar.MapFromArraysFunction")
.build();

public static final BuiltInFunctionDefinition MAP_FROM_ENTRIES =
BuiltInFunctionDefinition.newBuilder()
.name("MAP_FROM_ENTRIES")
.kind(SCALAR)
.inputTypeStrategy(SpecificInputTypeStrategies.MAP_FROM_ENTRIES)
.outputTypeStrategy(SpecificTypeStrategies.MAP_FROM_ENTRIES)
.runtimeClass(
"org.apache.flink.table.runtime.functions.scalar.MapFromEntriesFunction")
.build();

public static final BuiltInFunctionDefinition SOURCE_WATERMARK =
BuiltInFunctionDefinition.newBuilder()
.name("SOURCE_WATERMARK")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.types.inference.strategies;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.ArgumentCount;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.Signature;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;

import java.util.Collections;
import java.util.List;
import java.util.Optional;

/**
* {@link InputTypeStrategy} specific for {@link BuiltInFunctionDefinitions#MAP_FROM_ENTRIES}.
*
* <p>It checks if an argument is an array type of row with two fields.
*/
@Internal
class MapFromEntriesInputTypeStrategy implements InputTypeStrategy {

@Override
public ArgumentCount getArgumentCount() {
return ConstantArgumentCount.of(1);
}

@Override
public Optional<List<DataType>> inferInputTypes(
CallContext callContext, boolean throwOnFailure) {
final List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();

final DataType dataType = argumentDataTypes.get(0);
final LogicalType logicalType = dataType.getLogicalType();
if (logicalType.is(LogicalTypeRoot.ARRAY)
&& ((ArrayType) logicalType).getElementType().is(LogicalTypeRoot.ROW)
&& ((ArrayType) logicalType).getElementType().getChildren().size() == 2) {
return Optional.of(Collections.singletonList(dataType));
} else {
return callContext.fail(
throwOnFailure,
"Unsupported argument type. Expected type 'ARRAY<ROW<`f0` ANY, `f1` ANY>>' but actual type was '%s'.",
logicalType.asSummaryString());
}
}

@Override
public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
return Collections.singletonList(
Signature.of(Signature.Argument.of("ARRAY<ROW<`f0` ANY, `f1` ANY>>")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public final class SpecificInputTypeStrategies {
/** See {@link MapInputTypeStrategy}. */
public static final InputTypeStrategy MAP = new MapInputTypeStrategy();

/** See {@link MapFromEntriesInputTypeStrategy}. */
public static final InputTypeStrategy MAP_FROM_ENTRIES = new MapFromEntriesInputTypeStrategy();

/** See {@link CurrentWatermarkInputTypeStrategy}. */
public static final InputTypeStrategy CURRENT_WATERMARK =
new CurrentWatermarkInputTypeStrategy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,26 @@ public final class SpecificTypeStrategies {
.get(0))
.getValueDataType()))));

/** Type strategy specific for {@link BuiltInFunctionDefinitions#MAP_FROM_ENTRIES}. */
public static final TypeStrategy MAP_FROM_ENTRIES =
callContext -> {
CollectionDataType argType =
(CollectionDataType) callContext.getArgumentDataTypes().get(0);
DataType entryRowType = argType.getElementDataType();
boolean nullable =
argType.getLogicalType().isNullable()
|| entryRowType.getLogicalType().isNullable();
// default logical MapType is nullable.
DataType resultType =
DataTypes.MAP(
entryRowType.getChildren().get(0),
entryRowType.getChildren().get(1));
if (!nullable) {
resultType = resultType.notNull();
}
return Optional.of(resultType);
};

/** Type strategy specific for {@link BuiltInFunctionDefinitions#MAP_FROM_ARRAYS}. */
public static final TypeStrategy MAP_FROM_ARRAYS =
callContext ->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.types.inference.strategies;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.inference.InputTypeStrategiesTestBase;

import java.util.stream.Stream;

/** Tests for {@link MapFromEntriesInputTypeStrategy}. */
class MapFromEntriesInputTypeStrategyTest extends InputTypeStrategiesTestBase {

@Override
protected Stream<TestSpec> testData() {
return Stream.of(
TestSpec.forStrategy(SpecificInputTypeStrategies.MAP_FROM_ENTRIES)
.calledWithArgumentTypes(
DataTypes.ARRAY(DataTypes.ROW(DataTypes.INT(), DataTypes.STRING())))
.expectSignature("f(ARRAY<ROW<`f0` ANY, `f1` ANY>>)")
.expectArgumentTypes(
DataTypes.ARRAY(
DataTypes.ROW(
DataTypes.FIELD("f0", DataTypes.INT()),
DataTypes.FIELD("f1", DataTypes.STRING())))),
TestSpec.forStrategy(
"ARRAY<ROW<`expected` INT>> doesn't work",
SpecificInputTypeStrategies.MAP_FROM_ENTRIES)
.calledWithArgumentTypes(
DataTypes.ARRAY(
DataTypes.ROW(
DataTypes.FIELD("expected", DataTypes.INT()))))
.expectErrorMessage(
"Unsupported argument type. Expected type 'ARRAY<ROW<`f0` ANY, `f1` ANY>>' but actual type was 'ARRAY<ROW<`expected` INT>>'."));
}
}
Loading