-
Notifications
You must be signed in to change notification settings - Fork 966
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Flink]supported schemaEvolution when restarting the paimon cdc job #3311
Conversation
… the schemaCompatible method was written in reverse position
I added some test cases, can you help review this pr? @JingsongLi |
Test Triggerred. |
The pr will add the following jobs: |
case "string": | ||
int newLength = afterData.get(field.field()).asText().length(); | ||
if (paimonField == null) { | ||
return DataTypes.VARCHAR(afterData.get(field.field()).asText().length()); | ||
return DataTypes.VARCHAR(newLength); | ||
} else if (paimonField.type() instanceof VarCharType) { | ||
int oldLength = ((VarCharType) paimonField.type()).getLength(); | ||
int newLength = afterData.get(field.field()).asText().length(); | ||
if (oldLength < newLength) { | ||
return DataTypes.VARCHAR(newLength); | ||
} else { | ||
return DataTypes.VARCHAR(oldLength); | ||
} | ||
return DataTypes.VARCHAR(Math.max(oldLength, newLength)); | ||
} else if (paimonField.type() instanceof CharType) { | ||
int oldLength = ((CharType) paimonField.type()).getLength(); | ||
int newLength = afterData.get(field.field()).asText().length(); | ||
if (oldLength < newLength) { | ||
return DataTypes.CHAR(newLength); | ||
} else { | ||
return DataTypes.CHAR(oldLength); | ||
} | ||
return DataTypes.CHAR(Math.max(oldLength, newLength)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in case "string", We cannot convert the varchar(n)/char(n) type into the string type, which will cause schemaCompatible to fail if the field changes when restarting the paimon-cdc job, because the string length of paimon is integer.max is always greater than n.
At the same time, when pg table performs column type alter(example: n changes from 10 to 20), because pg-debezium does not have ddl event, we can only get the length value n from the record
LOG.info( | ||
"New fields '{}' found in source table, will be synchronized to Paimon table.", | ||
field.name()); | ||
return true; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When idx is less than 0, it means that a new field has been added. Should schemaEvolution be performed? what you think?
@yuzelin @JingsongLi @zhuangchong
@JingsongLi @yuzelin @zhongyujiang I updated the code and test cases, can you help review them? |
Found some problems, I will close the PR and create a new PR for better review of the code |
move to #3362 |
Purpose
Restarting the flink-paimon-mysql-cdc job schemaCompatible failed when the data type length was modified between the stop and restart of the flink-paimon-mysql-cdc job
Tests
API and Format
Documentation