Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink]supported schemaEvolution when restarting the paimon cdc job #3311

Closed
wants to merge 13 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ public static boolean schemaCompatible(
for (DataField field : sourceTableFields) {
int idx = paimonSchema.fieldNames().indexOf(field.name());
if (idx < 0) {
LOG.info("Cannot find field '{}' in Paimon table.", field.name());
return false;
LOG.info(
"New fields '{}' found in source table, will be synchronized to Paimon table.",
field.name());
return true;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When idx is less than 0, it means that a new field has been added. Should schemaEvolution be performed? what you think?
@yuzelin @JingsongLi @zhuangchong

DataType type = paimonSchema.fields().get(idx).type();
if (UpdatedDataFieldsProcessFunction.canConvert(field.type(), type)
if (UpdatedDataFieldsProcessFunction.canConvert(type, field.type())
!= UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) {
LOG.info(
"Cannot convert field '{}' from source table type '{}' to Paimon type '{}'.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.flink.action.cdc.postgres.PostgresRecordParser;
import org.apache.paimon.flink.action.cdc.pulsar.PulsarActionUtils;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.TableSchema;

import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
Expand Down Expand Up @@ -197,6 +198,16 @@ public FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> provideRecordPar
List<ComputedColumn> computedColumns,
TypeMapping typeMapping,
CdcMetadataConverter[] metadataConverters) {
return this.provideRecordParser(
caseSensitive, computedColumns, typeMapping, metadataConverters, null);
}

public FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> provideRecordParser(
boolean caseSensitive,
List<ComputedColumn> computedColumns,
TypeMapping typeMapping,
CdcMetadataConverter[] metadataConverters,
TableSchema paimonSchema) {
switch (sourceType) {
case MYSQL:
return new MySqlRecordParser(
Expand All @@ -211,7 +222,8 @@ public FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> provideRecordPar
caseSensitive,
computedColumns,
typeMapping,
metadataConverters);
metadataConverters,
paimonSchema);
case KAFKA:
case PULSAR:
DataFormat dataFormat = provideDataFormat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ protected void beforeBuildingSourceSink() throws Exception {
// Check if table exists before trying to get or create it
if (catalog.tableExists(identifier)) {
fileStoreTable = (FileStoreTable) catalog.getTable(identifier);
fileStoreTable = alterTableOptions(identifier, fileStoreTable);
try {
Schema retrievedSchema = retrieveSchema();
computedColumns =
buildComputedColumns(computedColumnArgs, retrievedSchema.fields());
Schema paimonSchema = buildPaimonSchema(retrievedSchema);
assertSchemaCompatible(fileStoreTable.schema(), paimonSchema.fields());
alterTableSchema(identifier, fileStoreTable, paimonSchema);
} catch (SchemaRetrievalException e) {
LOG.info(
"Failed to retrieve schema from record data but there exists specified Paimon table. "
Expand Down Expand Up @@ -157,7 +157,11 @@ protected void beforeBuildingSourceSink() throws Exception {
@Override
protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> recordParse() {
return syncJobHandler.provideRecordParser(
caseSensitive, computedColumns, typeMapping, metadataConverters);
caseSensitive,
computedColumns,
typeMapping,
metadataConverters,
fileStoreTable.schema());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
import org.apache.paimon.flink.action.cdc.watermark.CdcWatermarkStrategy;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
Expand Down Expand Up @@ -79,7 +82,6 @@ public SynchronizationActionBase(
this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig);
this.syncJobHandler = syncJobHandler;
this.caseSensitive = catalog.caseSensitive();

this.syncJobHandler.registerJdbcDriver();
}

Expand Down Expand Up @@ -177,7 +179,8 @@ protected abstract void buildSink(
DataStream<RichCdcMultiplexRecord> input,
EventParser.Factory<RichCdcMultiplexRecord> parserFactory);

protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable table) {
protected FileStoreTable alterTableSchema(
Identifier identifier, FileStoreTable table, Schema paimonSchema) {
// doesn't support altering bucket here
Map<String, String> dynamicOptions = new HashMap<>(tableConfig);
dynamicOptions.remove(CoreOptions.BUCKET.key());
Expand All @@ -194,13 +197,18 @@ protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable
oldOptions.get(entry.getKey()), entry.getValue()));

// alter the table dynamic options
List<SchemaChange> optionChanges =
List<SchemaChange> changes =
dynamicOptions.entrySet().stream()
.map(entry -> SchemaChange.setOption(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());

List<SchemaChange> columnChanges =
UpdatedDataFieldsProcessFunction.extractSchemaChanges(
new SchemaManager(table.fileIO(), table.location()), paimonSchema.fields());

changes.addAll(columnChanges);
try {
catalog.alterTable(identifier, optionChanges, false);
catalog.alterTable(identifier, changes, false);
} catch (Catalog.TableNotExistException
| Catalog.ColumnAlreadyExistException
| Catalog.ColumnNotExistException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ protected void beforeBuildingSourceSink() throws Exception {
Supplier<String> errMsg =
incompatibleMessage(table.schema(), tableInfo, identifier);
if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) {
table = alterTableOptions(identifier, table);
table = alterTableSchema(identifier, table, fromMySql);
tables.add(table);
monitoredTables.addAll(tableInfo.identifiers());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
Expand Down Expand Up @@ -70,6 +73,7 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
Expand Down Expand Up @@ -101,26 +105,30 @@ public class PostgresRecordParser
private String currentTable;
private String databaseName;
private final CdcMetadataConverter[] metadataConverters;
private TableSchema paimonSchema;

public PostgresRecordParser(
Configuration postgresConfig,
boolean caseSensitive,
TypeMapping typeMapping,
CdcMetadataConverter[] metadataConverters) {
CdcMetadataConverter[] metadataConverters,
TableSchema schema) {
this(
postgresConfig,
caseSensitive,
Collections.emptyList(),
typeMapping,
metadataConverters);
metadataConverters,
schema);
}

public PostgresRecordParser(
Configuration postgresConfig,
boolean caseSensitive,
List<ComputedColumn> computedColumns,
TypeMapping typeMapping,
CdcMetadataConverter[] metadataConverters) {
CdcMetadataConverter[] metadataConverters,
TableSchema paimonSchema) {
this.caseSensitive = caseSensitive;
this.computedColumns = computedColumns;
this.typeMapping = typeMapping;
Expand All @@ -133,6 +141,7 @@ public PostgresRecordParser(
stringifyServerTimeZone == null
? ZoneId.systemDefault()
: ZoneId.of(stringifyServerTimeZone);
this.paimonSchema = paimonSchema;
}

@Override
Expand All @@ -146,7 +155,7 @@ public void flatMap(CdcSourceRecord rawEvent, Collector<RichCdcMultiplexRecord>
extractRecords().forEach(out::collect);
}

private List<DataField> extractFields(DebeziumEvent.Field schema) {
private List<DataField> extractFields(DebeziumEvent.Field schema, JsonNode afterData) {
Map<String, DebeziumEvent.Field> afterFields = schema.afterFields();
Preconditions.checkArgument(
!afterFields.isEmpty(),
Expand All @@ -157,16 +166,22 @@ private List<DataField> extractFields(DebeziumEvent.Field schema) {
RowType.Builder rowType = RowType.builder();
Set<String> existedFields = new HashSet<>();
Function<String, String> columnDuplicateErrMsg = columnDuplicateErrMsg(currentTable);

Map<String, DataField> paimonFields =
paimonSchema.fields().stream()
.collect(Collectors.toMap(DataField::name, Function.identity()));

afterFields.forEach(
(key, value) -> {
(key, afterField) -> {
String columnName =
columnCaseConvertAndDuplicateCheck(
key, existedFields, caseSensitive, columnDuplicateErrMsg);

DataType dataType = extractFieldType(value);
DataType dataType =
extractFieldType(afterField, paimonFields.get(key), afterData);
dataType =
dataType.copy(
typeMapping.containsMode(TO_NULLABLE) || value.optional());
typeMapping.containsMode(TO_NULLABLE) || afterField.optional());

rowType.field(columnName, dataType);
});
Expand All @@ -177,7 +192,8 @@ private List<DataField> extractFields(DebeziumEvent.Field schema) {
* Extract fields from json records, see <a
* href="https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-data-types">postgresql-data-types</a>.
*/
private DataType extractFieldType(DebeziumEvent.Field field) {
private DataType extractFieldType(
DebeziumEvent.Field field, DataField paimonField, JsonNode afterData) {
switch (field.type()) {
case "array":
return DataTypes.ARRAY(DataTypes.STRING());
Expand Down Expand Up @@ -209,6 +225,16 @@ private DataType extractFieldType(DebeziumEvent.Field field) {
case "boolean":
return DataTypes.BOOLEAN();
case "string":
int newLength = afterData.get(field.field()).asText().length();
if (paimonField == null) {
return DataTypes.VARCHAR(newLength);
} else if (paimonField.type() instanceof VarCharType) {
int oldLength = ((VarCharType) paimonField.type()).getLength();
return DataTypes.VARCHAR(Math.max(oldLength, newLength));
} else if (paimonField.type() instanceof CharType) {
int oldLength = ((CharType) paimonField.type()).getLength();
return DataTypes.CHAR(Math.max(oldLength, newLength));
}
return DataTypes.STRING();
case "bytes":
if (decimalLogicalName().equals(field.name())) {
Expand Down Expand Up @@ -248,7 +274,7 @@ private List<RichCdcMultiplexRecord> extractRecords() {
Map<String, String> after = extractRow(root.payload().after());
if (!after.isEmpty()) {
after = mapKeyCaseConvert(after, caseSensitive, recordKeyDuplicateErrMsg(after));
List<DataField> fields = extractFields(root.schema());
List<DataField> fields = extractFields(root.schema(), root.payload().after());
records.add(
new RichCdcMultiplexRecord(
databaseName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public static ConvertAction canConvert(DataType oldType, DataType newType) {
return ConvertAction.EXCEPTION;
}

protected List<SchemaChange> extractSchemaChanges(
public static List<SchemaChange> extractSchemaChanges(
SchemaManager schemaManager, List<DataField> updatedDataFields) {
RowType oldRowType = schemaManager.latest().get().logicalRowType();
Map<String, DataField> oldFields = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1364,4 +1364,45 @@ public void testColumnCommentChangeInExistingTable() throws Exception {
assertThat(actual.get("c1").description()).isEqualTo("c1 comment");
assertThat(actual.get("c2").description()).isEqualTo("c2 comment");
}

@Test
@Timeout(60)
public void testColumnAlterInExistingTableWhenStartJob() throws Exception {
Map<String, String> options = new HashMap<>();
options.put("bucket", "1");
options.put("sink.parallelism", "1");

RowType rowType =
RowType.builder()
.field("pk", DataTypes.INT().notNull())
.field("a", DataTypes.BIGINT())
.field("b", DataTypes.VARCHAR(20))
.build();

createFileStoreTable(
rowType, Collections.emptyList(), Collections.singletonList("pk"), options);

Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", DATABASE_NAME);
mySqlConfig.put("table-name", "test_exist_column_alter");

MySqlSyncTableAction action =
syncTableActionBuilder(mySqlConfig)
.withPrimaryKeys("pk")
.withTableConfig(getBasicTableConfig())
.build();

runActionWithDefaultEnv(action);

FileStoreTable table = getFileStoreTable();

Map<String, DataField> actual =
table.schema().fields().stream()
.collect(Collectors.toMap(DataField::name, Function.identity()));

assertThat(actual.get("pk").type()).isEqualTo(DataTypes.INT().notNull());
assertThat(actual.get("a").type()).isEqualTo(DataTypes.BIGINT());
assertThat(actual.get("b").type()).isEqualTo(DataTypes.VARCHAR(30));
assertThat(actual.get("c").type()).isEqualTo(DataTypes.INT());
}
}
Loading