Skip to content

[FLINK-38205][format][pb] Discard unknown fields by default #26881

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/** Keeps protobuf constants separately. */
public class PbConstant {
public static final String PB_METHOD_GET_DESCRIPTOR = "getDescriptor";
public static final String PB_METHOD_PARSE_FROM = "parseFrom";
public static final String PB_METHOD_PARSER = "parser";
public static final String GENERATED_DECODE_METHOD = "decode";
public static final String GENERATED_ENCODE_METHOD = "encode";
public static final String PB_MAP_KEY_NAME = "key";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.FileDescriptor.Syntax;
import com.google.protobuf.DiscardUnknownFieldsParser;
import com.google.protobuf.Parser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,8 +55,8 @@
*/
public class ProtoToRowConverter {
private static final Logger LOG = LoggerFactory.getLogger(ProtoToRowConverter.class);
private final Method parseFromMethod;
private final Method decodeMethod;
private final Parser<?> protoParser;
private boolean isCodeSplit = false;

public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig)
Expand Down Expand Up @@ -124,14 +126,16 @@ public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig)
codegenAppender.code());
decodeMethod =
generatedClass.getMethod(PbConstant.GENERATED_DECODE_METHOD, messageClass);
parseFromMethod = messageClass.getMethod(PbConstant.PB_METHOD_PARSE_FROM, byte[].class);
Method parserMethod = messageClass.getMethod(PbConstant.PB_METHOD_PARSER);
Parser originalProtoParser = (Parser) parserMethod.invoke(null);
protoParser = DiscardUnknownFieldsParser.wrap(originalProtoParser);
} catch (Exception ex) {
throw new PbCodegenException(ex);
}
}

public RowData convertProtoBinaryToRow(byte[] data) throws Exception {
Object messageObj = parseFromMethod.invoke(null, data);
Object messageObj = protoParser.parseFrom(data);
return (RowData) decodeMethod.invoke(null, messageObj);
}

Expand Down