Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36999][cdc-source-connectors] When the source type is oracle and field type is nember and the value is null, a null pointer exception occurs during deserialization. #3833

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

linjianchang
Copy link

@linjianchang linjianchang commented Jan 3, 2025

When the source type is oracle and field type is nember and the value is null, a null pointer exception occurs during deserialization.

…cimal and the value is null, a null pointer exception occurs during deserialization.
@linjianchang
Copy link
Author

source oracle table structure:
create table PORTAL.SYSTEM_USER
(
sys_user_id NUMBER not null,
staff_id NUMBER not null,
sys_user_code VARCHAR2(250),
password VARCHAR2(250),
pwd_err_cnt NUMBER not null,
pwd_sms_tel NUMBER
)

When the pwd_sms_tel field value of the nember type is null, a null pointer exception will be reported.
Exception:
java.lang.NullPointerException at io.debezium.data.VariableScaleDecimal.toLogical(VariableScaleDecimal.java:103) at com.ververica.cdc.debezium.event.DebeziumSchemaDataTypeInference.inferStruct(DebeziumSchemaDataTypeInference.java:190) at com.ververica.cdc.connectors.oracle.source.OracleSchemaDataTypeInference.inferStruct(OracleSchemaDataTypeInference.java:40) at com.ververica.cdc.debezium.event.DebeziumSchemaDataTypeInference.infer(DebeziumSchemaDataTypeInference.java:81) at com.ververica.cdc.debezium.event.DebeziumSchemaDataTypeInference.infer(DebeziumSchemaDataTypeInference.java:56) at com.ververica.cdc.debezium.event.DebeziumSchemaDataTypeInference.lambda$inferStruct$0(DebeziumSchemaDataTypeInference.java:199) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505) at com.ververica.cdc.debezium.event.DebeziumSchemaDataTypeInference.inferStruct(DebeziumSchemaDataTypeInference.java:200) at com.ververica.cdc.connectors.oracle.source.OracleSchemaDataTypeInference.inferStruct(OracleSchemaDataTypeInference.java:40) at com.ververica.cdc.debezium.event.DebeziumSchemaDataTypeInference.infer(DebeziumSchemaDataTypeInference.java:81) at com.ververica.cdc.debezium.event.DebeziumSchemaDataTypeInference.infer(DebeziumSchemaDataTypeInference.java:56) at com.ververica.cdc.debezium.event.DebeziumEventDeserializationSchema.extractDataRecord(DebeziumEventDeserializationSchema.java:157) at com.ververica.cdc.debezium.event.DebeziumEventDeserializationSchema.extractAfterDataRecord(DebeziumEventDeserializationSchema.java:153) at com.ververica.cdc.debezium.event.DebeziumEventDeserializationSchema.deserializeDataChangeRecord(DebeziumEventDeserializationSchema.java:119) at com.ververica.cdc.debezium.event.SourceRecordEventDeserializer.deserialize(SourceRecordEventDeserializer.java:47) at com.ververica.cdc.debezium.event.DebeziumEventDeserializationSchema.deserialize(DebeziumEventDeserializationSchema.java:106) at com.ververica.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:172) at com.ververica.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:134) at com.ververica.cdc.connectors.oracle.source.reader.OraclePipelineRecordEmitter.processElement(OraclePipelineRecordEmitter.java:90) at com.ververica.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:98) at com.ververica.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:59) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:389) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:545) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:836) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:785) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:750)

@linjianchang linjianchang changed the title [FLINK-36999][cdc-source-connectors] When the source field type is de… [FLINK-36999][cdc-source-connectors] When the source type is oracle and field type is nember and the value is null, a null pointer exception occurs during deserialization. Jan 6, 2025
@leonardBang leonardBang requested a review from GOODBOY008 January 8, 2025 12:04
Copy link
Member

@GOODBOY008 GOODBOY008 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@linjianchang Thanks for your contributions. I left some comments.

@@ -188,6 +188,10 @@ protected DataType inferBytes(Object value, Schema schema) {
protected DataType inferStruct(Object value, Schema schema) {
Struct struct = (Struct) value;
if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
if (struct == null) {
// set the default value
return DataTypes.DECIMAL(10, 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return DataTypes.DECIMAL(10, 0);
return DataTypes.DECIMAL(DecimalType.DEFAULT_PRECISION, DecimalType.DEFAULT_SCALE);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@linjianchang Can you add unit test for this scenario?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants