Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34123][FLINK-35068][FLINK-36903] Introduce built-in serialization support for common collection types #25797

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ Flink places some restrictions on the type of elements that can be in a DataStre
The reason for this is that the system analyzes the types to determine
efficient execution strategies.

There are seven different categories of data types:
There are eight different categories of data types:

1. **Java Tuples** and **Scala Case Classes**
2. **Java POJOs**
3. **Primitive Types**
4. **Regular Classes**
5. **Values**
6. **Hadoop Writables**
7. **Special Types**
4. **Common Collection Types**
5. **Regular Classes**
6. **Values**
7. **Hadoop Writables**
8. **Special Types**

#### Tuples and Case Classes

Expand Down Expand Up @@ -167,6 +168,24 @@ input.keyBy(_.word)

Flink supports all Java and Scala primitive types such as `Integer`, `String`, and `Double`.

#### Common Collection Types

Since Flink 2.0, Flink comes with dedicated serialization support for common Java collection types, which is more efficient than going
through a general purpose serialization framework. Currently, only `Map`, `List`, `Set` and its super interface `Collection`
are supported. To utilize it, you need to declare the collection type with:

1. Concrete type arguments: e.g. `List<String>` but not `List`, `List<T>`, or `List<?>`, as Flink needs them to dispatch
serialization of the element types.
2. Interface types: e.g. `List<String>` but not `LinkedList<String>`, as Flink does not preserve the underlying
implementation types across serialization.

Other nonqualified collection types will be handled by Flink as general class types. If the implementation types are
also required to be preserved, you also need to register it with a custom serializer.

You may disable the built-in type support for collection types via
[pipeline.built-in-collection-types]({{< ref "docs/deployment/config#pipeline-built-in-collection-types" >}})
to achieve the same serialization behavior in Flink versions prior to 2.0.

#### General Class Types

Flink supports most Java and Scala classes (API and custom).
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/pipeline_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
<td>Duration</td>
<td>The interval of the automatic watermark emission. Watermarks are used throughout the streaming system to keep track of the progress of time. They are used, for example, for time based windowing.</td>
</tr>
<tr>
<td><h5>pipeline.built-in-collection-types</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>If enabled, TypeExtractor will use built-in serializers for Maps, Lists, and Sets, which need to be treated by Kyro otherwise.</td>
</tr>
<tr>
<td><h5>pipeline.cached-files</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
configuration
.getOptional(PipelineOptions.FORCE_KRYO_AVRO)
.ifPresent(this::setForceKryoAvro);
configuration
.getOptional(PipelineOptions.BUILT_IN_COLLECTION_TYPES)
.ifPresent(TypeExtractor::setBuiltInCollectionTypesEnabled);

try {
configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.api.java.typeutils.PojoField;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.SetTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
Expand All @@ -49,8 +50,10 @@
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* This class gives access to the type information of the most common types for which Flink has
Expand Down Expand Up @@ -443,6 +446,22 @@ public static <E> TypeInformation<List<E>> LIST(TypeInformation<E> elementType)
return new ListTypeInfo<>(elementType);
}

/**
* Returns type information for a Java {@link java.util.Set}. A set must not be null. Null
* values in elements are not supported.
*
* <p>By default, sets are untyped and treated as a generic type in Flink; therefore, it is
* useful to pass type information whenever a set is used.
*
* <p><strong>Note:</strong> Flink does not preserve the concrete {@link Set} type. It converts
* a list into {@link HashSet} when copying or deserializing.
*
* @param elementType type information for the set's elements
*/
public static <E> TypeInformation<Set<E>> SET(TypeInformation<E> elementType) {
return new SetTypeInfo<>(elementType);
}

/**
* Returns type information for Java enumerations. Null values are not supported.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.typeutils.base;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A serializer for {@link java.util.Set}. The serializer relies on an element serializer for the
* serialization of the set's elements.
*
* <p>The serialization format for the set is as follows: four bytes for the length of the set,
* followed by the serialized representation of each element. To allow null values, each value is
* prefixed by a null marker.
*
* @param <T> The type of element in the set.
*/
@Internal
public final class SetSerializer<T> extends TypeSerializer<Set<T>> {

private static final long serialVersionUID = 1L;

/** The serializer for the elements of the set. */
private final TypeSerializer<T> elementSerializer;

/**
* Creates a set serializer that uses the given serializer to serialize the set's elements.
*
* @param elementSerializer The serializer for the elements of the set
*/
public SetSerializer(TypeSerializer<T> elementSerializer) {
this.elementSerializer = checkNotNull(elementSerializer);
}

// ------------------------------------------------------------------------
// SetSerializer specific properties
// ------------------------------------------------------------------------

/**
* Gets the serializer for the elements of the set.
*
* @return The serializer for the elements of the set
*/
public TypeSerializer<T> getElementSerializer() {
return elementSerializer;
}

// ------------------------------------------------------------------------
// Type Serializer implementation
// ------------------------------------------------------------------------

@Override
public boolean isImmutableType() {
return false;
}

@Override
public TypeSerializer<Set<T>> duplicate() {
TypeSerializer<T> duplicateElement = elementSerializer.duplicate();
return duplicateElement == elementSerializer ? this : new SetSerializer<>(duplicateElement);
}

@Override
public Set<T> createInstance() {
return new HashSet<>(0);
}

@Override
public Set<T> copy(Set<T> from) {
Set<T> newSet = new HashSet<>(from.size());
for (T element : from) {
T newElement = element == null ? null : elementSerializer.copy(element);
newSet.add(newElement);
}
return newSet;
}

@Override
public Set<T> copy(Set<T> from, Set<T> reuse) {
return copy(from);
}

@Override
public int getLength() {
return -1; // var length
}

@Override
public void serialize(Set<T> set, DataOutputView target) throws IOException {
final int size = set.size();
target.writeInt(size);
for (T element : set) {
if (element == null) {
target.writeBoolean(true);
} else {
target.writeBoolean(false);
elementSerializer.serialize(element, target);
}
}
}

@Override
public Set<T> deserialize(DataInputView source) throws IOException {
final int size = source.readInt();
final Set<T> set = new HashSet<>(size);
for (int i = 0; i < size; i++) {
boolean isNull = source.readBoolean();
T element = isNull ? null : elementSerializer.deserialize(source);
set.add(element);
}
return set;
}

@Override
public Set<T> deserialize(Set<T> reuse, DataInputView source) throws IOException {
return deserialize(source);
}

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
// copy number of elements
final int num = source.readInt();
target.writeInt(num);
for (int i = 0; i < num; i++) {
boolean isNull = source.readBoolean();
target.writeBoolean(isNull);
if (!isNull) {
elementSerializer.copy(source, target);
}
}
}

// --------------------------------------------------------------------

@Override
public boolean equals(Object obj) {
return obj == this
|| (obj != null
&& obj.getClass() == getClass()
&& elementSerializer.equals(((SetSerializer<?>) obj).elementSerializer));
}

@Override
public int hashCode() {
return elementSerializer.hashCode();
}

// --------------------------------------------------------------------------------------------
// Serializer configuration snapshot & compatibility
// --------------------------------------------------------------------------------------------

@Override
public TypeSerializerSnapshot<Set<T>> snapshotConfiguration() {
return new SetSerializerSnapshot<>(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.typeutils.base;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;

import java.util.Set;

/** Snapshot class for the {@link SetSerializer}. */
@Internal
public class SetSerializerSnapshot<T>
extends CompositeTypeSerializerSnapshot<Set<T>, SetSerializer<T>> {

private static final int CURRENT_VERSION = 1;

/** Constructor for read instantiation. */
public SetSerializerSnapshot() {}

/** Constructor to create the snapshot for writing. */
public SetSerializerSnapshot(SetSerializer<T> setSerializer) {
super(setSerializer);
}

@Override
public int getCurrentOuterSnapshotVersion() {
return CURRENT_VERSION;
}

@Override
protected SetSerializer<T> createOuterSerializerWithNestedSerializers(
TypeSerializer<?>[] nestedSerializers) {
@SuppressWarnings("unchecked")
TypeSerializer<T> elementSerializer = (TypeSerializer<T>) nestedSerializers[0];
return new SetSerializer<>(elementSerializer);
}

@Override
protected TypeSerializer<?>[] getNestedSerializers(SetSerializer<T> outerSerializer) {
return new TypeSerializer<?>[] {outerSerializer.getElementSerializer()};
}
}
Loading