Skip to content

Commit

Permalink
[FLINK-36903][core][type] Add switch for built-in serialization suppo…
Browse files Browse the repository at this point in the history
…rt for common collection types
  • Loading branch information
X-czh committed Dec 15, 2024
1 parent b27a0b3 commit 1a6d993
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ Flink supports all Java and Scala primitive types such as `Integer`, `String`, a

#### Common Collection Types

Flink comes with dedicated serialization support for common Java collection types, which is more efficient than going
Since Flink 2.0, Flink comes with dedicated serialization support for common Java collection types, which is more efficient than going
through a general purpose serialization framework. Currently, only `Map`, `List`, `Set` and its super interface `Collection`
are supported. To utilize it, you need to declare the collection type with:

Expand All @@ -182,6 +182,10 @@ are supported. To utilize it, you need to declare the collection type with:
Other nonqualified collection types will be handled by Flink as general class types. If the implementation types are
also required to be preserved, you also need to register it with a custom serializer.

You may disable the built-in type support for collection types via
[pipeline.built-in-collection-types]({{< ref "docs/deployment/config#pipeline-built-in-collection-types" >}})
to achieve the same serialization behavior in Flink versions prior to 2.0.

#### General Class Types

Flink supports most Java and Scala classes (API and custom).
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/pipeline_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
<td>Duration</td>
<td>The interval of the automatic watermark emission. Watermarks are used throughout the streaming system to keep track of the progress of time. They are used, for example, for time based windowing.</td>
</tr>
<tr>
<td><h5>pipeline.built-in-collection-types</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>If enabled, TypeExtractor will use built-in serializers for Maps, Lists, and Sets, which need to be treated by Kyro otherwise.</td>
</tr>
<tr>
<td><h5>pipeline.cached-files</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
configuration
.getOptional(PipelineOptions.FORCE_KRYO_AVRO)
.ifPresent(this::setForceKryoAvro);
configuration
.getOptional(PipelineOptions.BUILT_IN_COLLECTION_TYPES)
.ifPresent(TypeExtractor::setBuiltInCollectionTypesEnabled);

try {
configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,18 @@ protected TypeExtractor() {
// only create instances for special use cases
}

/**
* Whether to use built-in types for common collection types, which need to be treated by Kyro
* otherwise. Since TypeExtractor methods are mostly static, we use a static variable to control
* their behavior as a temporary solution.
*/
private static boolean builtInCollectionTypesEnabled = true;

@Internal
public static void setBuiltInCollectionTypesEnabled(boolean enabled) {
builtInCollectionTypesEnabled = enabled;
}

// --------------------------------------------------------------------------------------------
// TypeInfoFactory registry
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -1979,7 +1991,7 @@ private <OUT, IN1, IN2> TypeInformation<OUT> privateGetForClass(
// - OK: List<String>, Collection<String>
// - not OK: LinkedList<String> (implementation type), List (raw type), List<T> (generic
// type argument), or List<?> (wildcard type argument)
if (parameterizedType != null) {
if (parameterizedType != null && builtInCollectionTypesEnabled) {
Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
boolean allTypeArgumentsConcrete =
Arrays.stream(actualTypeArguments).allMatch(arg -> arg instanceof Class<?>);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@ public class PipelineOptions {
+ " would cause the program to fail.")
.build());

public static final ConfigOption<Boolean> BUILT_IN_COLLECTION_TYPES =
key("pipeline.built-in-collection-types")
.booleanType()
.defaultValue(true)
.withDescription(
"If enabled, TypeExtractor will use built-in serializers for Maps, Lists, and Sets, which need to be treated by Kyro otherwise.");

public static final ConfigOption<Map<String, String>> GLOBAL_JOB_PARAMETERS =
key("pipeline.global-job-parameters")
.mapType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2587,4 +2587,66 @@ private void testCollectionTypesInternal(TypeInformation<?> ti) {
.getTypeInformation())
.isInstanceOf(GenericTypeInfo.class);
}

@Test
public <T> void testCollectionTypesWithoutBuiltInTypes() {
TypeExtractor.setBuiltInCollectionTypesEnabled(false);

MapFunction<?, ?> function =
new MapFunction<PojoWithCollections<T>, PojoWithCollections<T>>() {
@Override
public PojoWithCollections map(PojoWithCollections<T> value) {
return null;
}
};
TypeInformation<?> ti =
TypeExtractor.getMapReturnTypes(
function,
(TypeInformation)
TypeInformation.of(new TypeHint<PojoWithCollections<T>>() {}));
assertThat(ti).isInstanceOf(PojoTypeInfo.class);
testGenericCollectionTypes(ti);

// use getForClass()
TypeInformation<?> ti2 = TypeExtractor.getForClass(PojoWithCollections.class);
assertThat(ti2).isInstanceOf(PojoTypeInfo.class);
testGenericCollectionTypes(ti2);

// use getForObject()
PojoWithCollections<T> t = new PojoWithCollections<>();
TypeInformation<?> ti3 = TypeExtractor.getForObject(t);
assertThat(ti3).isInstanceOf(PojoTypeInfo.class);
testGenericCollectionTypes(ti3);

// restore state
TypeExtractor.setBuiltInCollectionTypesEnabled(true);
}

private void testGenericCollectionTypes(TypeInformation<?> ti) {
PojoTypeInfo<?> pojoTi = (PojoTypeInfo<?>) ti;
assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("mapVal")).getTypeInformation())
.isInstanceOf(GenericTypeInfo.class);
assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("listVal")).getTypeInformation())
.isInstanceOf(GenericTypeInfo.class);
assertThat(
pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("collectionVal"))
.getTypeInformation())
.isInstanceOf(GenericTypeInfo.class);
assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("setVal")).getTypeInformation())
.isInstanceOf(GenericTypeInfo.class);
assertThat(
pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("linkedListVal"))
.getTypeInformation())
.isInstanceOf(GenericTypeInfo.class);
assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("rawListVal")).getTypeInformation())
.isInstanceOf(GenericTypeInfo.class);
assertThat(
pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("genericListVal"))
.getTypeInformation())
.isInstanceOf(GenericTypeInfo.class);
assertThat(
pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("wildcardListVal"))
.getTypeInformation())
.isInstanceOf(GenericTypeInfo.class);
}
}

0 comments on commit 1a6d993

Please sign in to comment.