Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink]supported schemaEvolution when restarting the paimon cdc job #3311

Closed
wants to merge 13 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ public static boolean schemaCompatible(
for (DataField field : sourceTableFields) {
int idx = paimonSchema.fieldNames().indexOf(field.name());
if (idx < 0) {
LOG.info("Cannot find field '{}' in Paimon table.", field.name());
return false;
LOG.info(
"New fields '{}' found in source table, will be synchronized to Paimon table.",
field.name());
return true;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When idx is less than 0, it means that a new field has been added. Should schemaEvolution be performed? what you think?
@yuzelin @JingsongLi @zhuangchong

DataType type = paimonSchema.fields().get(idx).type();
if (UpdatedDataFieldsProcessFunction.canConvert(type, field.type())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.flink.action.cdc.postgres.PostgresRecordParser;
import org.apache.paimon.flink.action.cdc.pulsar.PulsarActionUtils;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.TableSchema;

import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
Expand Down Expand Up @@ -197,6 +198,16 @@ public FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> provideRecordPar
List<ComputedColumn> computedColumns,
TypeMapping typeMapping,
CdcMetadataConverter[] metadataConverters) {
return this.provideRecordParser(
caseSensitive, computedColumns, typeMapping, metadataConverters, null);
}

public FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> provideRecordParser(
boolean caseSensitive,
List<ComputedColumn> computedColumns,
TypeMapping typeMapping,
CdcMetadataConverter[] metadataConverters,
TableSchema paimonSchema) {
switch (sourceType) {
case MYSQL:
return new MySqlRecordParser(
Expand All @@ -211,7 +222,8 @@ public FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> provideRecordPar
caseSensitive,
computedColumns,
typeMapping,
metadataConverters);
metadataConverters,
paimonSchema);
case KAFKA:
case PULSAR:
DataFormat dataFormat = provideDataFormat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ protected void beforeBuildingSourceSink() throws Exception {
// Check if table exists before trying to get or create it
if (catalog.tableExists(identifier)) {
fileStoreTable = (FileStoreTable) catalog.getTable(identifier);
fileStoreTable = alterTableOptions(identifier, fileStoreTable);
try {
Schema retrievedSchema = retrieveSchema();
computedColumns =
buildComputedColumns(computedColumnArgs, retrievedSchema.fields());
Schema paimonSchema = buildPaimonSchema(retrievedSchema);
assertSchemaCompatible(fileStoreTable.schema(), paimonSchema.fields());
alterTableSchema(identifier, fileStoreTable, paimonSchema);
} catch (SchemaRetrievalException e) {
LOG.info(
"Failed to retrieve schema from record data but there exists specified Paimon table. "
Expand Down Expand Up @@ -157,7 +157,11 @@ protected void beforeBuildingSourceSink() throws Exception {
@Override
protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> recordParse() {
return syncJobHandler.provideRecordParser(
caseSensitive, computedColumns, typeMapping, metadataConverters);
caseSensitive,
computedColumns,
typeMapping,
metadataConverters,
fileStoreTable.schema());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
import org.apache.paimon.flink.action.cdc.watermark.CdcWatermarkStrategy;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
Expand Down Expand Up @@ -79,7 +82,6 @@ public SynchronizationActionBase(
this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig);
this.syncJobHandler = syncJobHandler;
this.caseSensitive = catalog.caseSensitive();

this.syncJobHandler.registerJdbcDriver();
}

Expand Down Expand Up @@ -177,7 +179,8 @@ protected abstract void buildSink(
DataStream<RichCdcMultiplexRecord> input,
EventParser.Factory<RichCdcMultiplexRecord> parserFactory);

protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable table) {
protected FileStoreTable alterTableSchema(
Identifier identifier, FileStoreTable table, Schema paimonSchema) {
// doesn't support altering bucket here
Map<String, String> dynamicOptions = new HashMap<>(tableConfig);
dynamicOptions.remove(CoreOptions.BUCKET.key());
Expand All @@ -194,13 +197,18 @@ protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable
oldOptions.get(entry.getKey()), entry.getValue()));

// alter the table dynamic options
List<SchemaChange> optionChanges =
List<SchemaChange> changes =
dynamicOptions.entrySet().stream()
.map(entry -> SchemaChange.setOption(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());

List<SchemaChange> columnChanges =
UpdatedDataFieldsProcessFunction.extractSchemaChanges(
new SchemaManager(table.fileIO(), table.location()), paimonSchema.fields());

changes.addAll(columnChanges);
try {
catalog.alterTable(identifier, optionChanges, false);
catalog.alterTable(identifier, changes, false);
} catch (Catalog.TableNotExistException
| Catalog.ColumnAlreadyExistException
| Catalog.ColumnNotExistException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ protected void beforeBuildingSourceSink() throws Exception {
Supplier<String> errMsg =
incompatibleMessage(table.schema(), tableInfo, identifier);
if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) {
table = alterTableOptions(identifier, table);
table = alterTableSchema(identifier, table, fromMySql);
tables.add(table);
monitoredTables.addAll(tableInfo.identifiers());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public static ConvertAction canConvert(DataType oldType, DataType newType) {
return ConvertAction.EXCEPTION;
}

protected List<SchemaChange> extractSchemaChanges(
public static List<SchemaChange> extractSchemaChanges(
SchemaManager schemaManager, List<DataField> updatedDataFields) {
RowType oldRowType = schemaManager.latest().get().logicalRowType();
Map<String, DataField> oldFields = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand Down Expand Up @@ -1368,37 +1367,42 @@ public void testColumnCommentChangeInExistingTable() throws Exception {

@Test
@Timeout(60)
public void testColumnTypeLengthChangeInExistingTable() throws Exception {
public void testColumnAlterInExistingTableWhenStartJob() throws Exception {
Map<String, String> options = new HashMap<>();
options.put("bucket", "1");
options.put("sink.parallelism", "1");

RowType rowType =
RowType.builder()
.field("pk", DataTypes.INT().notNull(), "pk comment")
.field("c1", DataTypes.DATE(), "c1 comment")
.field("c2", DataTypes.VARCHAR(10).notNull(), "c2 comment")
.field("pk", DataTypes.INT().notNull())
.field("a", DataTypes.BIGINT())
.field("b", DataTypes.VARCHAR(20))
.build();

createFileStoreTable(
rowType, Collections.emptyList(), Collections.singletonList("pk"), options);

// Alter column type length
try (Statement statement = getStatement()) {
statement.executeUpdate("USE " + DATABASE_NAME);
statement.executeUpdate(
"ALTER TABLE test_exist_column_type_length_change MODIFY COLUMN c2 varchar(20)");
}

Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", DATABASE_NAME);
mySqlConfig.put("table-name", "test_exist_column_type_length_change");
mySqlConfig.put("table-name", "test_exist_column_alter");

MySqlSyncTableAction action =
syncTableActionBuilder(mySqlConfig)
.withPrimaryKeys("pk")
.withTableConfig(getBasicTableConfig())
.build();
Assertions.assertDoesNotThrow(() -> runActionWithDefaultEnv(action));

runActionWithDefaultEnv(action);

FileStoreTable table = getFileStoreTable();

Map<String, DataField> actual =
table.schema().fields().stream()
.collect(Collectors.toMap(DataField::name, Function.identity()));

assertThat(actual.get("pk").type()).isEqualTo(DataTypes.INT().notNull());
assertThat(actual.get("a").type()).isEqualTo(DataTypes.BIGINT());
assertThat(actual.get("b").type()).isEqualTo(DataTypes.VARCHAR(30));
assertThat(actual.get("c").type()).isEqualTo(DataTypes.INT());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ private void testSchemaEvolutionImpl(Statement statement) throws Exception {
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.STRING()
DataTypes.INT().notNull(),
DataTypes.INT().notNull(),
DataTypes.VARCHAR(10)
},
new String[] {"pt", "_id", "v1"});
List<String> primaryKeys = Arrays.asList("pt", "_id");
Expand All @@ -118,7 +120,7 @@ private void testSchemaEvolutionImpl(Statement statement) throws Exception {
new DataType[] {
DataTypes.INT().notNull(),
DataTypes.INT().notNull(),
DataTypes.STRING(),
DataTypes.VARCHAR(10),
DataTypes.INT()
},
new String[] {"pt", "_id", "v1", "v2"});
Expand All @@ -145,7 +147,7 @@ private void testSchemaEvolutionImpl(Statement statement) throws Exception {
new DataType[] {
DataTypes.INT().notNull(),
DataTypes.INT().notNull(),
DataTypes.STRING(),
DataTypes.VARCHAR(10),
DataTypes.BIGINT()
},
new String[] {"pt", "_id", "v1", "v2"});
Expand All @@ -170,14 +172,14 @@ private void testSchemaEvolutionImpl(Statement statement) throws Exception {
statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v4 BYTEA");
statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v5 FLOAT");
statement.executeUpdate("ALTER TABLE schema_evolution_2 ALTER COLUMN v1 TYPE VARCHAR(20)");
statement.executeUpdate(
"UPDATE schema_evolution_2 SET v1 = 'very long string' WHERE _id = 8");
String v1 = "very long string";
statement.executeUpdate("UPDATE schema_evolution_2 SET v1 = '" + v1 + "' WHERE _id = 8");
rowType =
RowType.of(
new DataType[] {
DataTypes.INT().notNull(),
DataTypes.INT().notNull(),
DataTypes.STRING(),
DataTypes.VARCHAR(Math.max(v1.length(), 10)),
DataTypes.BIGINT(),
DataTypes.DECIMAL(8, 3),
DataTypes.BYTES(),
Expand Down Expand Up @@ -209,7 +211,7 @@ private void testSchemaEvolutionImpl(Statement statement) throws Exception {
new DataType[] {
DataTypes.INT().notNull(),
DataTypes.INT().notNull(),
DataTypes.STRING(),
DataTypes.VARCHAR(Math.max(v1.length(), 10)),
DataTypes.BIGINT(),
DataTypes.DECIMAL(8, 3),
DataTypes.BYTES(),
Expand Down Expand Up @@ -263,9 +265,9 @@ private void testSchemaEvolutionMultipleImpl(Statement statement) throws Excepti
RowType.of(
new DataType[] {
DataTypes.INT().notNull(),
DataTypes.STRING(),
DataTypes.VARCHAR(10),
DataTypes.INT(),
DataTypes.STRING()
DataTypes.VARCHAR(10)
},
new String[] {"_id", "v1", "v2", "v3"});
List<String> primaryKeys = Collections.singletonList("_id");
Expand All @@ -280,20 +282,30 @@ private void testSchemaEvolutionMultipleImpl(Statement statement) throws Excepti
+ "ADD COLUMN v6 DECIMAL(5, 3),"
+ "ADD COLUMN \"$% ^,& *(\" VARCHAR(10),"
+ "ALTER COLUMN v2 TYPE BIGINT");
String v1 = "long_string_two";
String v3 = "string_2";
String v7 = "test_2";

statement.executeUpdate(
"INSERT INTO schema_evolution_multiple VALUES "
+ "(2, 'long_string_two', 2000000000000, 'string_2', 20, 20.5, 20.002, 'test_2')");
+ "(2, '"
+ v1
+ "', 2000000000000, '"
+ v3
+ "', 20, 20.5, 20.002, '"
+ v7
+ "')");
rowType =
RowType.of(
new DataType[] {
DataTypes.INT().notNull(),
DataTypes.STRING(),
DataTypes.VARCHAR(v1.length()),
DataTypes.BIGINT(),
DataTypes.STRING(),
DataTypes.VARCHAR(10),
DataTypes.INT(),
DataTypes.DOUBLE(),
DataTypes.DECIMAL(5, 3),
DataTypes.STRING()
DataTypes.VARCHAR(v7.length())
},
new String[] {"_id", "v1", "v2", "v3", "v4", "v5", "v6", "$% ^,& *("});
expected =
Expand Down Expand Up @@ -362,8 +374,8 @@ private void testAllTypesImpl(Statement statement) throws Exception {
DataTypes.TIMESTAMP(6), // _timestamp0
DataTypes.TIME(6), // _time
DataTypes.TIME(6), // _time0
DataTypes.STRING(), // _char
DataTypes.STRING(), // _varchar
DataTypes.CHAR(10), // _char
DataTypes.VARCHAR(20), // _varchar
DataTypes.STRING(), // _text
DataTypes.BYTES(), // _bin
DataTypes.STRING(), // _json
Expand Down Expand Up @@ -659,7 +671,7 @@ public void testSyncShards() throws Exception {
RowType.of(
new DataType[] {
DataTypes.INT().notNull(),
DataTypes.STRING(),
DataTypes.VARCHAR(10),
DataTypes.STRING().notNull()
},
new String[] {"pk", "_date", "pt"});
Expand Down Expand Up @@ -752,7 +764,7 @@ public void testMetadataColumns() throws Exception {
RowType.of(
new DataType[] {
DataTypes.INT().notNull(),
DataTypes.STRING(),
DataTypes.VARCHAR(10),
DataTypes.STRING().notNull(),
DataTypes.STRING().notNull(),
DataTypes.STRING().notNull()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,15 @@ CREATE TABLE test_exist_column_comment_change (
PRIMARY KEY (pk)
);

CREATE TABLE test_exist_column_type_length_change (
pk INT,
c1 DATE,
c2 VARCHAR(10) not null,
PRIMARY KEY (pk)
CREATE TABLE test_exist_column_alter (
pk INT,
a BIGINT,
b VARCHAR(30),
c INT,
PRIMARY KEY (pk)
);


-- ################################################################################
-- testSyncShard
-- ################################################################################
Expand Down