Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink]supported schemaEvolution when restarting the paimon cdc job #3311

Closed
wants to merge 13 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -225,24 +225,15 @@ private DataType extractFieldType(
case "boolean":
return DataTypes.BOOLEAN();
case "string":
int newLength = afterData.get(field.field()).asText().length();
if (paimonField == null) {
return DataTypes.VARCHAR(afterData.get(field.field()).asText().length());
return DataTypes.VARCHAR(newLength);
} else if (paimonField.type() instanceof VarCharType) {
int oldLength = ((VarCharType) paimonField.type()).getLength();
int newLength = afterData.get(field.field()).asText().length();
if (oldLength < newLength) {
return DataTypes.VARCHAR(newLength);
} else {
return DataTypes.VARCHAR(oldLength);
}
return DataTypes.VARCHAR(Math.max(oldLength, newLength));
} else if (paimonField.type() instanceof CharType) {
int oldLength = ((CharType) paimonField.type()).getLength();
int newLength = afterData.get(field.field()).asText().length();
if (oldLength < newLength) {
return DataTypes.CHAR(newLength);
} else {
return DataTypes.CHAR(oldLength);
}
return DataTypes.CHAR(Math.max(oldLength, newLength));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case "string", We cannot convert the varchar(n)/char(n) type into the string type, which will cause schemaCompatible to fail if the field changes when restarting the paimon-cdc job, because the string length of paimon is integer.max is always greater than n.
At the same time, when pg table performs column type alter(example: n changes from 10 to 20), because pg-debezium does not have ddl event, we can only get the length value n from the record

}
return DataTypes.STRING();
case "bytes":
Expand Down