Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-11333][protobuf] First-class serializer support for Protobuf types #7598

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

Myasuka
Copy link
Member

@Myasuka Myasuka commented Jan 30, 2019

What is the purpose of the change

Support Protobuf types directly in Flink. Unlike the built-in avro serializer of Flink, check whether schema evolvable left to Protobuf itself not checking before any code running currently. This is a known limitation and recorded in FLINK-11333's comments.

Brief change log

  • Support to extract ProtobufTypeInfo within TypeExtractor.
  • Support to (de)serialize protobuf's message directly without introducing chill-protobuf and Kryo.
  • Support to migrate old savepoint data which serialized by kryo serializer to use current newly ProtobufSeriazlier
  • Also support to build protoc on travis.

Verifying this change

This change added tests and can be verified as follows:

  • Added unit tests for ProtobufTypeInfo, ProtobufSeriazlier and ProtobufSeriazlierSnapshot.
  • Extended integration test to verify whether could migrate old savepoint data with kryo serializer to use current newly ProtobufSeriazlier.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): yes
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: yes
  • The runtime per-record code paths (performance sensitive): no, should not.
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes, savepoint data might need to be migarated.
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs

@Myasuka
Copy link
Member Author

Myasuka commented Jan 30, 2019

CC @tzulitai would you please review this part of code?

@Myasuka
Copy link
Member Author

Myasuka commented Feb 2, 2019

To not change current Flink's build prerequisites, I add the generated protobuf message UserProtobuf into the PR. Otherwise, users must install protobuf-2.x before building Flink.

@elliotvilhelm
Copy link

Any expected timeline for this feature? Would really like to use this instead of having to register custom protobuf serializer/deserializer. Thanks!

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 4, 2019

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@nitinpandey-154
Copy link

@Myasuka Any idea if and when this might be merged?

@Myasuka
Copy link
Member Author

Myasuka commented May 14, 2020

@nitinpandey-154 @elliotvilhelm Thanks for you watch.
@tzulitai , do you think it's time to restart the progress of this ticket after Flink-1.11 ?

@morvenhuang
Copy link

When would this be merged?

@shuttie
Copy link
Contributor

shuttie commented Jun 1, 2021

For anyone interested in this issue, I've implemented a first-class Protobuf support for Apache Flink as a separate library: https://github.com/findify/flink-protobuf

It works both with protobuf-java and ScalaPB generated classes.

As a usage example, given that you have a following message format:

message Foo { 
  required int32 value = 1; 
}

You can build a TypeInformation for scalapb-generated classes like this:

import io.findify.flinkpb.FlinkProtobuf 

implicit val ti = FlinkProtobuf.generateScala(Foo) 
val result = env.fromCollection(List(Foo(1), Foo(2), Foo(3))) 

For Java it's going to look a bit different:

import io.findify.flinkprotobuf.java.Tests; 
import org.apache.flink.api.common.typeinfo.TypeInformation; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 

TypeInformation<Tests.Foo> ti = FlinkProtobuf.generateJava(Tests.Foo.class, Tests.Foo.getDefaultInstance()); 
env.fromCollection(List.of(Tests.Foo.newBuilder().setValue(1).build()), ti).executeAndCollect(100); 

The main motivation for this library was our own use case when:

  • We have most of the Protobuf messages being encoded as a oneof, so in Scala they become parts of a sealed trait hierarchy. And Flink falls back to Kryo on such cases, resulting in a severe performance degradation
  • Kryo-encoded messages may take like 10x more bytes for abstract classes, so it became too easy to saturate a 1gbit link between nodes.
  • There is no support for schema evolution for scala case classes.

With this one we can just rely on protobuf to handle all the problematic cases.

@itaykat
Copy link

itaykat commented May 29, 2022

Any updates regarding this one?

@eskabetxe
Copy link
Member

@tzulitai @MartijnVisser this would be great for Flink 2.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants