From 7dbc84e42ec8199b84dec05d7db86fee266968a0 Mon Sep 17 00:00:00 2001 From: Zhanghao Chen Date: Mon, 8 Apr 2024 21:21:21 +0800 Subject: [PATCH 1/4] [FLINK-34123][core][type] Introduce built-in serialization support for Map, List, and Collection (cherry picked from commit 73676da59584f360c02d139a916033e8967583b3) --- .../api/java/typeutils/TypeExtractor.java | 29 ++++++ .../typeutils/PojoTypeExtractionTest.java | 2 +- .../api/java/typeutils/TypeExtractorTest.java | 92 ++++++++++++++++++- .../apache/flink/types/PojoTestUtilsTest.java | 2 +- 4 files changed, 118 insertions(+), 7 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 93902c4d84c7f..b761186dd5225 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -42,6 +42,7 @@ import org.apache.flink.api.common.typeinfo.TypeInfo; import org.apache.flink.api.common.typeinfo.TypeInfoFactory; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; @@ -66,6 +67,7 @@ import java.lang.reflect.Type; import java.lang.reflect.TypeVariable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -1968,6 +1970,33 @@ private TypeInformation privateGetForClass( return new EnumTypeInfo(clazz); } + // check for parameterized Collections, requirement: + // 1. Interface types: the underlying implementation types are not preserved across + // serialization + // 2. Concrete type arguments: Flink needs them to dispatch serialization of element types + // Example: + // - OK: List, Collection + // - not OK: LinkedList (implementation type), List (raw type), List (generic + // type argument), or List (wildcard type argument) + if (parameterizedType != null) { + Type[] actualTypeArguments = parameterizedType.getActualTypeArguments(); + boolean allTypeArgumentsConcrete = + Arrays.stream(actualTypeArguments).allMatch(arg -> arg instanceof Class); + if (allTypeArgumentsConcrete) { + if (clazz.isAssignableFrom(Map.class)) { + Class keyClass = (Class) actualTypeArguments[0]; + Class valueClass = (Class) actualTypeArguments[1]; + TypeInformation keyTypeInfo = createTypeInfo(keyClass); + TypeInformation valueTypeInfo = createTypeInfo(valueClass); + return (TypeInformation) Types.MAP(keyTypeInfo, valueTypeInfo); + } else if (clazz.isAssignableFrom(List.class)) { + Class elementClass = (Class) actualTypeArguments[0]; + TypeInformation elementTypeInfo = createTypeInfo(elementClass); + return (TypeInformation) Types.LIST(elementTypeInfo); + } + } + } + // special case for POJOs generated by Avro. if (AvroUtils.isAvroSpecificRecord(clazz)) { return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz); diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java index bc5506bb836ad..eef6ba5d8681b 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java @@ -421,7 +421,7 @@ private void checkWCPojoAsserts(TypeInformation typeInfo) { fail("already seen"); } collectionSeen = true; - assertThat(field.getTypeInformation()).isEqualTo(new GenericTypeInfo(List.class)); + assertThat(field.getTypeInformation()).isEqualTo(new ListTypeInfo<>(String.class)); } else { fail("Unexpected field " + field); diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java index 84c2a9255f04b..72a1df065f720 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java @@ -63,7 +63,10 @@ import java.sql.Time; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -2316,15 +2319,15 @@ void testEitherFromObjectException() { @SuppressWarnings({"unchecked", "rawtypes"}) @Test void testGenericTypeWithSubclassInput() { - Map inputMap = new HashMap<>(); + HashMap inputMap = new LinkedHashMap<>(); inputMap.put("a", "b"); TypeInformation inputType = TypeExtractor.createTypeInfo(inputMap.getClass()); MapFunction function = - new MapFunction, Map>() { + new MapFunction, HashMap>() { @Override - public Map map(Map stringObjectMap) + public HashMap map(HashMap stringObjectMap) throws Exception { return stringObjectMap; } @@ -2332,7 +2335,7 @@ public Map map(Map stringObjectMap) TypeInformation ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) inputType); - TypeInformation expected = TypeExtractor.createTypeInfo(Map.class); + TypeInformation expected = TypeExtractor.createTypeInfo(HashMap.class); assertThat(ti).isEqualTo(expected); } @@ -2344,7 +2347,9 @@ void testGenericTypeWithSuperclassInput() { TypeInformation inputType = TypeExtractor.createTypeInfo(Map.class); MapFunction function = - (MapFunction, Map>) + (MapFunction< + LinkedHashMap, + Map>) stringObjectMap -> stringObjectMap; TypeExtractor.getMapReturnTypes(function, (TypeInformation) inputType); }) @@ -2500,4 +2505,81 @@ public Tuple3 map(Tuple3 value) assertThat(TypeExtractor.getForObject(Timestamp.valueOf("1998-12-12 12:37:45"))) .isEqualTo(SqlTimeTypeInfo.TIMESTAMP); } + + @SuppressWarnings({"rawtypes"}) + public static class PojoWithCollections { + // Supported collection types with concrete type arguments, expected built-in serialization + // support + public Map mapVal = new HashMap<>(); + public List listVal = new ArrayList<>(); + public Collection collectionVal = new ArrayList<>(); + + // Collection fields with unsupported collection types, treated as generic types + public LinkedList linkedListVal = new LinkedList<>(); + + // Collection fields with raw type, treated as generic types + public List rawListVal = new ArrayList<>(); + + // Collection fields with generic type arguments, treated as generic types + public List genericListVal = new ArrayList<>(); + + // Collection fields with wildcard type arguments, treated as generic types + public List wildcardListVal = new ArrayList<>(); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Test + public void testCollectionTypes() { + MapFunction function = + new MapFunction, PojoWithCollections>() { + @Override + public PojoWithCollections map(PojoWithCollections value) { + return null; + } + }; + TypeInformation ti = + TypeExtractor.getMapReturnTypes( + function, + (TypeInformation) + TypeInformation.of(new TypeHint>() {})); + assertThat(ti).isInstanceOf(PojoTypeInfo.class); + testCollectionTypesInternal(ti); + + // use getForClass() + TypeInformation ti2 = TypeExtractor.getForClass(PojoWithCollections.class); + assertThat(ti2).isInstanceOf(PojoTypeInfo.class); + testCollectionTypesInternal(ti2); + + // use getForObject() + PojoWithCollections t = new PojoWithCollections<>(); + TypeInformation ti3 = TypeExtractor.getForObject(t); + assertThat(ti3).isInstanceOf(PojoTypeInfo.class); + testCollectionTypesInternal(ti3); + } + + private void testCollectionTypesInternal(TypeInformation ti) { + PojoTypeInfo pojoTi = (PojoTypeInfo) ti; + assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("mapVal")).getTypeInformation()) + .isInstanceOf(MapTypeInfo.class); + assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("listVal")).getTypeInformation()) + .isInstanceOf(ListTypeInfo.class); + assertThat( + pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("collectionVal")) + .getTypeInformation()) + .isInstanceOf(ListTypeInfo.class); + assertThat( + pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("linkedListVal")) + .getTypeInformation()) + .isInstanceOf(GenericTypeInfo.class); + assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("rawListVal")).getTypeInformation()) + .isInstanceOf(GenericTypeInfo.class); + assertThat( + pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("genericListVal")) + .getTypeInformation()) + .isInstanceOf(GenericTypeInfo.class); + assertThat( + pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("wildcardListVal")) + .getTypeInformation()) + .isInstanceOf(GenericTypeInfo.class); + } } diff --git a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/types/PojoTestUtilsTest.java b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/types/PojoTestUtilsTest.java index 506124816b1da..2cc1550741f7d 100644 --- a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/types/PojoTestUtilsTest.java +++ b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/types/PojoTestUtilsTest.java @@ -76,7 +76,7 @@ public static class Pojo { } public static class PojoRequiringKryo { - public List x; + public List x; } @TypeInfo(FooFactory.class) From e1e0b77ef9c0d7b4c76d2ef4aa962b850b45ed0b Mon Sep 17 00:00:00 2001 From: Zhanghao Chen Date: Mon, 8 Apr 2024 22:07:01 +0800 Subject: [PATCH 2/4] [FLINK-34123][docs][type] Add doc for built-in serialization support for Map, List, and Collection (cherry picked from commit f860631c523c1d446c0d01046f0fbe6055174dc6) --- .../serialization/types_serialization.md | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md b/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md index 4b3dc6f76c23e..cf97beef6198a 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md @@ -37,15 +37,16 @@ Flink places some restrictions on the type of elements that can be in a DataStre The reason for this is that the system analyzes the types to determine efficient execution strategies. -There are seven different categories of data types: +There are eight different categories of data types: 1. **Java Tuples** and **Scala Case Classes** 2. **Java POJOs** 3. **Primitive Types** -4. **Regular Classes** -5. **Values** -6. **Hadoop Writables** -7. **Special Types** +4. **Common Collection Types** +5. **Regular Classes** +6. **Values** +7. **Hadoop Writables** +8. **Special Types** #### Tuples and Case Classes @@ -167,6 +168,20 @@ input.keyBy(_.word) Flink supports all Java and Scala primitive types such as `Integer`, `String`, and `Double`. +#### Common Collection Types + +Flink comes with dedicated serialization support for common Java collection types, which is more efficient than going +through a general purpose serialization framework. Currently, only `Map`, `List` and its super interface `Collection` +are supported. To utilize it, you need to declare the collection type with: + +1. Concrete type arguments: e.g. `List` but not `List`, `List`, or `List`, as Flink needs them to dispatch + serialization of the element types. +2. Interface types: e.g. `List` but not `LinkedList`, as Flink does not preserve the underlying + implementation types across serialization. + +Other nonqualified collection types will be handled by Flink as general class types. If the implementation types are +also required to be preserved, you also need to register it with a custom serializer. + #### General Class Types Flink supports most Java and Scala classes (API and custom). From b27a0b3bff1257038f409b55d02f00034f43ad1c Mon Sep 17 00:00:00 2001 From: Zhanghao Chen Date: Tue, 9 Apr 2024 12:52:14 +0800 Subject: [PATCH 3/4] [FLINK-35068][core][type] Introduce built-in serialization support for java.util.Set (cherry picked from commit e1b45684394541ee290a3d81cc59a85623396c42) --- .../serialization/types_serialization.md | 2 +- .../flink/api/common/typeinfo/Types.java | 19 ++ .../common/typeutils/base/SetSerializer.java | 181 ++++++++++++++++++ .../typeutils/base/SetSerializerSnapshot.java | 59 ++++++ .../flink/api/java/typeutils/SetTypeInfo.java | 131 +++++++++++++ .../api/java/typeutils/TypeExtractor.java | 5 + .../typeutils/base/SetSerializerTest.java | 86 +++++++++ .../api/java/typeutils/SetTypeInfoTest.java | 35 ++++ .../api/java/typeutils/TypeExtractorTest.java | 5 + .../TypeSerializerTestCoverageTest.java | 4 +- 10 files changed, 525 insertions(+), 2 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SetSerializer.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SetSerializerSnapshot.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/java/typeutils/SetTypeInfo.java create mode 100644 flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SetSerializerTest.java create mode 100644 flink-core/src/test/java/org/apache/flink/api/java/typeutils/SetTypeInfoTest.java diff --git a/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md b/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md index cf97beef6198a..4241e20fd23b8 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md @@ -171,7 +171,7 @@ Flink supports all Java and Scala primitive types such as `Integer`, `String`, a #### Common Collection Types Flink comes with dedicated serialization support for common Java collection types, which is more efficient than going -through a general purpose serialization framework. Currently, only `Map`, `List` and its super interface `Collection` +through a general purpose serialization framework. Currently, only `Map`, `List`, `Set` and its super interface `Collection` are supported. To utilize it, you need to declare the collection type with: 1. Concrete type arguments: e.g. `List` but not `List`, `List`, or `List`, as Flink needs them to dispatch diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java index a66e3be2b135d..511635215ca4a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java @@ -30,6 +30,7 @@ import org.apache.flink.api.java.typeutils.PojoField; import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.SetTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.ValueTypeInfo; @@ -49,8 +50,10 @@ import java.time.LocalTime; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; /** * This class gives access to the type information of the most common types for which Flink has @@ -443,6 +446,22 @@ public static TypeInformation> LIST(TypeInformation elementType) return new ListTypeInfo<>(elementType); } + /** + * Returns type information for a Java {@link java.util.Set}. A set must not be null. Null + * values in elements are not supported. + * + *

By default, sets are untyped and treated as a generic type in Flink; therefore, it is + * useful to pass type information whenever a set is used. + * + *

Note: Flink does not preserve the concrete {@link Set} type. It converts + * a list into {@link HashSet} when copying or deserializing. + * + * @param elementType type information for the set's elements + */ + public static TypeInformation> SET(TypeInformation elementType) { + return new SetTypeInfo<>(elementType); + } + /** * Returns type information for Java enumerations. Null values are not supported. * diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SetSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SetSerializer.java new file mode 100644 index 0000000000000..0f86346e26742 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SetSerializer.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A serializer for {@link java.util.Set}. The serializer relies on an element serializer for the + * serialization of the set's elements. + * + *

The serialization format for the set is as follows: four bytes for the length of the set, + * followed by the serialized representation of each element. To allow null values, each value is + * prefixed by a null marker. + * + * @param The type of element in the set. + */ +@Internal +public final class SetSerializer extends TypeSerializer> { + + private static final long serialVersionUID = 1L; + + /** The serializer for the elements of the set. */ + private final TypeSerializer elementSerializer; + + /** + * Creates a set serializer that uses the given serializer to serialize the set's elements. + * + * @param elementSerializer The serializer for the elements of the set + */ + public SetSerializer(TypeSerializer elementSerializer) { + this.elementSerializer = checkNotNull(elementSerializer); + } + + // ------------------------------------------------------------------------ + // SetSerializer specific properties + // ------------------------------------------------------------------------ + + /** + * Gets the serializer for the elements of the set. + * + * @return The serializer for the elements of the set + */ + public TypeSerializer getElementSerializer() { + return elementSerializer; + } + + // ------------------------------------------------------------------------ + // Type Serializer implementation + // ------------------------------------------------------------------------ + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer> duplicate() { + TypeSerializer duplicateElement = elementSerializer.duplicate(); + return duplicateElement == elementSerializer ? this : new SetSerializer<>(duplicateElement); + } + + @Override + public Set createInstance() { + return new HashSet<>(0); + } + + @Override + public Set copy(Set from) { + Set newSet = new HashSet<>(from.size()); + for (T element : from) { + T newElement = element == null ? null : elementSerializer.copy(element); + newSet.add(newElement); + } + return newSet; + } + + @Override + public Set copy(Set from, Set reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; // var length + } + + @Override + public void serialize(Set set, DataOutputView target) throws IOException { + final int size = set.size(); + target.writeInt(size); + for (T element : set) { + if (element == null) { + target.writeBoolean(true); + } else { + target.writeBoolean(false); + elementSerializer.serialize(element, target); + } + } + } + + @Override + public Set deserialize(DataInputView source) throws IOException { + final int size = source.readInt(); + final Set set = new HashSet<>(size); + for (int i = 0; i < size; i++) { + boolean isNull = source.readBoolean(); + T element = isNull ? null : elementSerializer.deserialize(source); + set.add(element); + } + return set; + } + + @Override + public Set deserialize(Set reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + // copy number of elements + final int num = source.readInt(); + target.writeInt(num); + for (int i = 0; i < num; i++) { + boolean isNull = source.readBoolean(); + target.writeBoolean(isNull); + if (!isNull) { + elementSerializer.copy(source, target); + } + } + } + + // -------------------------------------------------------------------- + + @Override + public boolean equals(Object obj) { + return obj == this + || (obj != null + && obj.getClass() == getClass() + && elementSerializer.equals(((SetSerializer) obj).elementSerializer)); + } + + @Override + public int hashCode() { + return elementSerializer.hashCode(); + } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshot & compatibility + // -------------------------------------------------------------------------------------------- + + @Override + public TypeSerializerSnapshot> snapshotConfiguration() { + return new SetSerializerSnapshot<>(this); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SetSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SetSerializerSnapshot.java new file mode 100644 index 0000000000000..cc4ca52052b88 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SetSerializerSnapshot.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import java.util.Set; + +/** Snapshot class for the {@link SetSerializer}. */ +@Internal +public class SetSerializerSnapshot + extends CompositeTypeSerializerSnapshot, SetSerializer> { + + private static final int CURRENT_VERSION = 1; + + /** Constructor for read instantiation. */ + public SetSerializerSnapshot() {} + + /** Constructor to create the snapshot for writing. */ + public SetSerializerSnapshot(SetSerializer setSerializer) { + super(setSerializer); + } + + @Override + public int getCurrentOuterSnapshotVersion() { + return CURRENT_VERSION; + } + + @Override + protected SetSerializer createOuterSerializerWithNestedSerializers( + TypeSerializer[] nestedSerializers) { + @SuppressWarnings("unchecked") + TypeSerializer elementSerializer = (TypeSerializer) nestedSerializers[0]; + return new SetSerializer<>(elementSerializer); + } + + @Override + protected TypeSerializer[] getNestedSerializers(SetSerializer outerSerializer) { + return new TypeSerializer[] {outerSerializer.getElementSerializer()}; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/SetTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/SetTypeInfo.java new file mode 100644 index 0000000000000..fba52102ed6a9 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/SetTypeInfo.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.SetSerializer; + +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link TypeInformation} for the set types of the Java API. + * + * @param The type of the elements in the set. + */ +@PublicEvolving +public final class SetTypeInfo extends TypeInformation> { + + private static final long serialVersionUID = 1L; + + private final TypeInformation elementTypeInfo; + + public SetTypeInfo(Class elementTypeClass) { + this.elementTypeInfo = + of(checkNotNull(elementTypeClass, "The element type class cannot be null.")); + } + + public SetTypeInfo(TypeInformation elementTypeInfo) { + this.elementTypeInfo = + checkNotNull(elementTypeInfo, "The element type information cannot be null."); + } + + // ------------------------------------------------------------------------ + // SetTypeInfo specific properties + // ------------------------------------------------------------------------ + + /** Gets the type information for the elements contained in the set. */ + public TypeInformation getElementTypeInfo() { + return elementTypeInfo; + } + + // ------------------------------------------------------------------------ + // TypeInformation implementation + // ------------------------------------------------------------------------ + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 0; + } + + @Override + public int getTotalFields() { + return 1; + } + + @SuppressWarnings("unchecked") + @Override + public Class> getTypeClass() { + return (Class>) (Class) Set.class; + } + + @Override + public boolean isKeyType() { + return false; + } + + @Override + public TypeSerializer> createSerializer(SerializerConfig config) { + TypeSerializer elementTypeSerializer = elementTypeInfo.createSerializer(config); + return new SetSerializer<>(elementTypeSerializer); + } + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "Set<" + elementTypeInfo + '>'; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } else if (obj instanceof SetTypeInfo) { + final SetTypeInfo other = (SetTypeInfo) obj; + return other.canEqual(this) && elementTypeInfo.equals(other.elementTypeInfo); + } else { + return false; + } + } + + @Override + public int hashCode() { + return 31 * elementTypeInfo.hashCode() + 1; + } + + @Override + public boolean canEqual(Object obj) { + return obj != null && obj.getClass() == getClass(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index b761186dd5225..c1acc30482991 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -73,6 +73,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda; import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods; @@ -1993,6 +1994,10 @@ private TypeInformation privateGetForClass( Class elementClass = (Class) actualTypeArguments[0]; TypeInformation elementTypeInfo = createTypeInfo(elementClass); return (TypeInformation) Types.LIST(elementTypeInfo); + } else if (clazz.isAssignableFrom(Set.class)) { + Class elementClass = (Class) actualTypeArguments[0]; + TypeInformation elementTypeInfo = createTypeInfo(elementClass); + return (TypeInformation) Types.SET(elementTypeInfo); } } } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SetSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SetSerializerTest.java new file mode 100644 index 0000000000000..3ffa481956b3d --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SetSerializerTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; + +/** A test for the {@link SetSerializer}. */ +class SetSerializerTest extends SerializerTestBase> { + + @Override + protected TypeSerializer> createSerializer() { + return new SetSerializer<>(LongSerializer.INSTANCE); + } + + @Override + protected int getLength() { + return -1; + } + + @SuppressWarnings("unchecked") + @Override + protected Class> getTypeClass() { + return (Class>) (Class) Set.class; + } + + @SuppressWarnings({"unchecked"}) + @Override + protected Set[] getTestData() { + final Random rnd = new Random(123654789); + + // empty sets + final Set set1 = Collections.emptySet(); + final Set set2 = new HashSet<>(); + final Set set3 = new TreeSet<>(); + + // single element sets + final Set set4 = Collections.singleton(55L); + final Set set5 = new HashSet<>(); + set5.add(12345L); + final Set set6 = new TreeSet<>(); + set6.add(777888L); + + // longer sets + final Set set7 = new HashSet<>(); + for (int i = 0; i < rnd.nextInt(200); i++) { + set7.add(rnd.nextLong()); + } + + int set8Len = rnd.nextInt(200); + final Set set8 = new HashSet<>(set8Len); + for (int i = 0; i < set8Len; i++) { + set8.add(rnd.nextLong()); + } + + // null-value sets + final Set set9 = Collections.singleton(null); + final Set set10 = new HashSet<>(); + set10.add(null); + + return (Set[]) + new Set[] {set1, set2, set3, set4, set5, set6, set7, set8, set9, set10}; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/SetTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/SetTypeInfoTest.java new file mode 100644 index 0000000000000..f301fe81d2a4a --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/SetTypeInfoTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.TypeInformationTestBase; + +/** Test for {@link SetTypeInfo}. */ +class SetTypeInfoTest extends TypeInformationTestBase> { + + @Override + protected SetTypeInfo[] getTestData() { + return new SetTypeInfo[] { + new SetTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO), + new SetTypeInfo<>(BasicTypeInfo.BOOLEAN_TYPE_INFO), + new SetTypeInfo<>(Object.class), + }; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java index 72a1df065f720..012183f270c08 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java @@ -65,10 +65,12 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -2513,6 +2515,7 @@ public static class PojoWithCollections { public Map mapVal = new HashMap<>(); public List listVal = new ArrayList<>(); public Collection collectionVal = new ArrayList<>(); + public Set setVal = new HashSet<>(); // Collection fields with unsupported collection types, treated as generic types public LinkedList linkedListVal = new LinkedList<>(); @@ -2567,6 +2570,8 @@ private void testCollectionTypesInternal(TypeInformation ti) { pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("collectionVal")) .getTypeInformation()) .isInstanceOf(ListTypeInfo.class); + assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("setVal")).getTypeInformation()) + .isInstanceOf(SetTypeInfo.class); assertThat( pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("linkedListVal")) .getTypeInformation()) diff --git a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java index a5d288b37d9a1..59f0d256f13c4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java @@ -30,6 +30,7 @@ import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer; import org.apache.flink.api.common.typeutils.base.LocalTimeSerializer; import org.apache.flink.api.common.typeutils.base.NullValueSerializer; +import org.apache.flink.api.common.typeutils.base.SetSerializer; import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.api.common.typeutils.base.array.BooleanPrimitiveArraySerializer; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; @@ -243,7 +244,8 @@ public void testTypeSerializerTestCoverage() { AvroSerializer.class.getName(), // KeyAndValueSerializer shouldn't be used to serialize data to state and // doesn't need to ensure upgrade compatibility. - "org.apache.flink.streaming.api.operators.sortpartition.KeyAndValueSerializer"); + "org.apache.flink.streaming.api.operators.sortpartition.KeyAndValueSerializer", + SetSerializer.class.getName()); // check if a test exists for each type serializer for (Class typeSerializer : typeSerializers) { From 1a6d99384e2e497bd423b44dfdb81b6c7093aba9 Mon Sep 17 00:00:00 2001 From: Zhanghao Chen Date: Sun, 15 Dec 2024 00:14:10 +0800 Subject: [PATCH 4/4] [FLINK-36903][core][type] Add switch for built-in serialization support for common collection types --- .../serialization/types_serialization.md | 6 +- .../generated/pipeline_configuration.html | 6 ++ .../serialization/SerializerConfigImpl.java | 3 + .../api/java/typeutils/TypeExtractor.java | 14 ++++- .../flink/configuration/PipelineOptions.java | 7 +++ .../api/java/typeutils/TypeExtractorTest.java | 62 +++++++++++++++++++ 6 files changed, 96 insertions(+), 2 deletions(-) diff --git a/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md b/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md index 4241e20fd23b8..03c07f65e710c 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md @@ -170,7 +170,7 @@ Flink supports all Java and Scala primitive types such as `Integer`, `String`, a #### Common Collection Types -Flink comes with dedicated serialization support for common Java collection types, which is more efficient than going +Since Flink 2.0, Flink comes with dedicated serialization support for common Java collection types, which is more efficient than going through a general purpose serialization framework. Currently, only `Map`, `List`, `Set` and its super interface `Collection` are supported. To utilize it, you need to declare the collection type with: @@ -182,6 +182,10 @@ are supported. To utilize it, you need to declare the collection type with: Other nonqualified collection types will be handled by Flink as general class types. If the implementation types are also required to be preserved, you also need to register it with a custom serializer. +You may disable the built-in type support for collection types via +[pipeline.built-in-collection-types]({{< ref "docs/deployment/config#pipeline-built-in-collection-types" >}}) +to achieve the same serialization behavior in Flink versions prior to 2.0. + #### General Class Types Flink supports most Java and Scala classes (API and custom). diff --git a/docs/layouts/shortcodes/generated/pipeline_configuration.html b/docs/layouts/shortcodes/generated/pipeline_configuration.html index f8b511206c7fd..4aac865704024 100644 --- a/docs/layouts/shortcodes/generated/pipeline_configuration.html +++ b/docs/layouts/shortcodes/generated/pipeline_configuration.html @@ -20,6 +20,12 @@ Duration The interval of the automatic watermark emission. Watermarks are used throughout the streaming system to keep track of the progress of time. They are used, for example, for time based windowing. + +

pipeline.built-in-collection-types
+ true + Boolean + If enabled, TypeExtractor will use built-in serializers for Maps, Lists, and Sets, which need to be treated by Kyro otherwise. +
pipeline.cached-files
(none) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java index d42695e0e6ae2..cd5986f9a2922 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java @@ -356,6 +356,9 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) { configuration .getOptional(PipelineOptions.FORCE_KRYO_AVRO) .ifPresent(this::setForceKryoAvro); + configuration + .getOptional(PipelineOptions.BUILT_IN_COLLECTION_TYPES) + .ifPresent(TypeExtractor::setBuiltInCollectionTypesEnabled); try { configuration diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index c1acc30482991..3b4c601f936db 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -134,6 +134,18 @@ protected TypeExtractor() { // only create instances for special use cases } + /** + * Whether to use built-in types for common collection types, which need to be treated by Kyro + * otherwise. Since TypeExtractor methods are mostly static, we use a static variable to control + * their behavior as a temporary solution. + */ + private static boolean builtInCollectionTypesEnabled = true; + + @Internal + public static void setBuiltInCollectionTypesEnabled(boolean enabled) { + builtInCollectionTypesEnabled = enabled; + } + // -------------------------------------------------------------------------------------------- // TypeInfoFactory registry // -------------------------------------------------------------------------------------------- @@ -1979,7 +1991,7 @@ private TypeInformation privateGetForClass( // - OK: List, Collection // - not OK: LinkedList (implementation type), List (raw type), List (generic // type argument), or List (wildcard type argument) - if (parameterizedType != null) { + if (parameterizedType != null && builtInCollectionTypesEnabled) { Type[] actualTypeArguments = parameterizedType.getActualTypeArguments(); boolean allTypeArgumentsConcrete = Arrays.stream(actualTypeArguments).allMatch(arg -> arg instanceof Class); diff --git a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java index 9df2d7201a6d8..73edc656c1018 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java @@ -170,6 +170,13 @@ public class PipelineOptions { + " would cause the program to fail.") .build()); + public static final ConfigOption BUILT_IN_COLLECTION_TYPES = + key("pipeline.built-in-collection-types") + .booleanType() + .defaultValue(true) + .withDescription( + "If enabled, TypeExtractor will use built-in serializers for Maps, Lists, and Sets, which need to be treated by Kyro otherwise."); + public static final ConfigOption> GLOBAL_JOB_PARAMETERS = key("pipeline.global-job-parameters") .mapType() diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java index 012183f270c08..fe27a134bcd4a 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java @@ -2587,4 +2587,66 @@ private void testCollectionTypesInternal(TypeInformation ti) { .getTypeInformation()) .isInstanceOf(GenericTypeInfo.class); } + + @Test + public void testCollectionTypesWithoutBuiltInTypes() { + TypeExtractor.setBuiltInCollectionTypesEnabled(false); + + MapFunction function = + new MapFunction, PojoWithCollections>() { + @Override + public PojoWithCollections map(PojoWithCollections value) { + return null; + } + }; + TypeInformation ti = + TypeExtractor.getMapReturnTypes( + function, + (TypeInformation) + TypeInformation.of(new TypeHint>() {})); + assertThat(ti).isInstanceOf(PojoTypeInfo.class); + testGenericCollectionTypes(ti); + + // use getForClass() + TypeInformation ti2 = TypeExtractor.getForClass(PojoWithCollections.class); + assertThat(ti2).isInstanceOf(PojoTypeInfo.class); + testGenericCollectionTypes(ti2); + + // use getForObject() + PojoWithCollections t = new PojoWithCollections<>(); + TypeInformation ti3 = TypeExtractor.getForObject(t); + assertThat(ti3).isInstanceOf(PojoTypeInfo.class); + testGenericCollectionTypes(ti3); + + // restore state + TypeExtractor.setBuiltInCollectionTypesEnabled(true); + } + + private void testGenericCollectionTypes(TypeInformation ti) { + PojoTypeInfo pojoTi = (PojoTypeInfo) ti; + assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("mapVal")).getTypeInformation()) + .isInstanceOf(GenericTypeInfo.class); + assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("listVal")).getTypeInformation()) + .isInstanceOf(GenericTypeInfo.class); + assertThat( + pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("collectionVal")) + .getTypeInformation()) + .isInstanceOf(GenericTypeInfo.class); + assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("setVal")).getTypeInformation()) + .isInstanceOf(GenericTypeInfo.class); + assertThat( + pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("linkedListVal")) + .getTypeInformation()) + .isInstanceOf(GenericTypeInfo.class); + assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("rawListVal")).getTypeInformation()) + .isInstanceOf(GenericTypeInfo.class); + assertThat( + pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("genericListVal")) + .getTypeInformation()) + .isInstanceOf(GenericTypeInfo.class); + assertThat( + pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("wildcardListVal")) + .getTypeInformation()) + .isInstanceOf(GenericTypeInfo.class); + } }