Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cdc] fix paimon cdc job restart failed when schema changed #3362

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,23 @@ public static boolean schemaCompatible(
for (DataField field : sourceTableFields) {
int idx = paimonSchema.fieldNames().indexOf(field.name());
if (idx < 0) {
LOG.info("Cannot find field '{}' in Paimon table.", field.name());
return false;
}
DataType type = paimonSchema.fields().get(idx).type();
if (UpdatedDataFieldsProcessFunction.canConvert(field.type(), type)
!= UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) {
LOG.info(
"Cannot convert field '{}' from source table type '{}' to Paimon type '{}'.",
field.name(),
field.type(),
type);
return false;
if (!field.type().isNullable()) {
LOG.info(
"Add column '{}' cannot specify NOT NULL in the Paimon table.",
field.name());
return false;
}
} else {
DataType type = paimonSchema.fields().get(idx).type();
if (UpdatedDataFieldsProcessFunction.canConvert(type, field.type())
!= UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) {
LOG.info(
"Cannot convert field '{}' from source table type '{}' to Paimon type '{}'.",
field.name(),
field.type(),
type);
return false;
}
}
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ protected void beforeBuildingSourceSink() throws Exception {
// Check if table exists before trying to get or create it
if (catalog.tableExists(identifier)) {
fileStoreTable = (FileStoreTable) catalog.getTable(identifier);
fileStoreTable = alterTableOptions(identifier, fileStoreTable);
try {
Schema retrievedSchema = retrieveSchema();
computedColumns =
buildComputedColumns(computedColumnArgs, retrievedSchema.fields());
Schema paimonSchema = buildPaimonSchema(retrievedSchema);
assertSchemaCompatible(fileStoreTable.schema(), paimonSchema.fields());
fileStoreTable = alterTableSchema(identifier, fileStoreTable, paimonSchema);
} catch (SchemaRetrievalException e) {
LOG.info(
"Failed to retrieve schema from record data but there exists specified Paimon table. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import org.apache.paimon.flink.action.cdc.watermark.CdcWatermarkStrategy;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
Expand All @@ -41,6 +44,7 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -182,7 +186,8 @@ protected abstract void buildSink(
DataStream<RichCdcMultiplexRecord> input,
EventParser.Factory<RichCdcMultiplexRecord> parserFactory);

protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable table) {
protected FileStoreTable alterTableSchema(
Identifier identifier, FileStoreTable table, Schema paimonSchema) {
// doesn't support altering bucket here
Map<String, String> dynamicOptions = new HashMap<>(tableConfig);
dynamicOptions.remove(CoreOptions.BUCKET.key());
Expand All @@ -197,19 +202,30 @@ protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable
immutableOptionKeys.contains(entry.getKey())
|| Objects.equals(
oldOptions.get(entry.getKey()), entry.getValue()));

if (dynamicOptions.isEmpty()) {
return table;
}
List<SchemaChange> tableSchemaChanges = new ArrayList<>();

// alter the table dynamic options
List<SchemaChange> optionChanges =
dynamicOptions.entrySet().stream()
.map(entry -> SchemaChange.setOption(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());

// alter the table schema
List<SchemaChange> columnChanges =
UpdatedDataFieldsProcessFunction.extractSchemaChanges(
new SchemaManager(table.fileIO(), table.location()),
paimonSchema.fields(),
allowUpperCase);

tableSchemaChanges.addAll(optionChanges);
tableSchemaChanges.addAll(columnChanges);

if (tableSchemaChanges.isEmpty()) {
return table;
}

try {
catalog.alterTable(identifier, optionChanges, false);
catalog.alterTable(identifier, tableSchemaChanges, false);
} catch (Catalog.TableNotExistException
| Catalog.ColumnAlreadyExistException
| Catalog.ColumnNotExistException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ protected void beforeBuildingSourceSink() throws Exception {
Supplier<String> errMsg =
incompatibleMessage(table.schema(), tableInfo, identifier);
if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) {
table = alterTableOptions(identifier, table);
table = alterTableSchema(identifier, table, fromMySql);
tables.add(table);
monitoredTables.addAll(tableInfo.identifiers());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,14 @@ public static DataType toDataType(
return DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18));
case PG_CHAR:
case PG_CHARACTER:
return DataTypes.CHAR(precision);
case PG_CHAR_ARRAY:
case PG_CHARACTER_ARRAY:
return DataTypes.ARRAY(DataTypes.CHAR(precision));
case PG_CHARACTER_VARYING:
return DataTypes.VARCHAR(precision);
case PG_CHARACTER_VARYING_ARRAY:
return DataTypes.ARRAY(DataTypes.VARCHAR(precision));
case PG_TEXT:
case PG_JSON:
case PG_ENUM:
return DataTypes.STRING();
case PG_CHAR_ARRAY:
case PG_CHARACTER_ARRAY:
case PG_CHARACTER_VARYING_ARRAY:
case PG_TEXT_ARRAY:
return DataTypes.ARRAY(DataTypes.STRING());
case PG_TIMESTAMP:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,15 @@ public static ConvertAction canConvert(DataType oldType, DataType newType) {
return ConvertAction.EXCEPTION;
}

protected List<SchemaChange> extractSchemaChanges(
public List<SchemaChange> extractSchemaChanges(
SchemaManager schemaManager, List<DataField> updatedDataFields) {
return extractSchemaChanges(schemaManager, updatedDataFields, allowUpperCase);
}

public static List<SchemaChange> extractSchemaChanges(
SchemaManager schemaManager,
List<DataField> updatedDataFields,
boolean allowUpperCase) {
RowType oldRowType = schemaManager.latest().get().logicalRowType();
Map<String, DataField> oldFields = new HashMap<>();
for (DataField oldField : oldRowType.getFields()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1420,8 +1420,6 @@ public void testColumnCommentChangeInExistingTable() throws Exception {
mySqlConfig.put("database-name", DATABASE_NAME);
mySqlConfig.put("table-name", "test_exist_column_comment_change");

// Flink cdc 2.3 does not support collecting field comments, and existing paimon table field
// comments will not be changed.
MySqlSyncTableAction action =
syncTableActionBuilder(mySqlConfig)
.withPrimaryKeys("pk")
Expand All @@ -1433,13 +1431,96 @@ public void testColumnCommentChangeInExistingTable() throws Exception {
Map<String, DataField> actual =
table.schema().fields().stream()
.collect(Collectors.toMap(DataField::name, Function.identity()));
assertThat(actual.get("pk").description()).isEqualTo("pk comment");
assertThat(actual.get("c1").description()).isEqualTo("c1 comment");
assertThat(actual.get("pk").description()).isEqualTo("pk new_comment");
assertThat(actual.get("c1").description()).isEqualTo("c1 new_comment");
assertThat(actual.get("c2").description()).isEqualTo("c2 comment");
}

@Test
@Timeout(60)
public void testColumnAlterInExistingTableBeforeStartJob() throws Exception {
Map<String, String> options = new HashMap<>();
options.put("bucket", "1");
options.put("sink.parallelism", "1");

RowType rowType =
RowType.builder()
.field("pk", DataTypes.INT().notNull())
.field("a", DataTypes.BIGINT())
.field("b", DataTypes.VARCHAR(20))
.build();

createFileStoreTable(
rowType,
Collections.emptyList(),
Collections.singletonList("pk"),
Collections.emptyList(),
options);

Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", DATABASE_NAME);
mySqlConfig.put("table-name", "test_exist_column_alter");

MySqlSyncTableAction action =
syncTableActionBuilder(mySqlConfig)
.withPrimaryKeys("pk")
.withTableConfig(getBasicTableConfig())
.build();

runActionWithDefaultEnv(action);

FileStoreTable table = getFileStoreTable();

Map<String, DataField> actual =
table.schema().fields().stream()
.collect(Collectors.toMap(DataField::name, Function.identity()));

assertThat(actual.get("pk").type()).isEqualTo(DataTypes.INT().notNull());
assertThat(actual.get("a").type()).isEqualTo(DataTypes.BIGINT());
assertThat(actual.get("b").type()).isEqualTo(DataTypes.VARCHAR(30));
assertThat(actual.get("c").type()).isEqualTo(DataTypes.INT());
}

@Test
@Timeout(60)
public void testAssertSchemaCompatibleWithAddColumnISNOTNULL() throws Exception {
Map<String, String> options = new HashMap<>();
options.put("bucket", "1");
options.put("sink.parallelism", "1");

RowType rowType =
RowType.builder()
.field("pk", DataTypes.INT().notNull())
.field("a", DataTypes.BIGINT())
.field("b", DataTypes.VARCHAR(20))
.build();

createFileStoreTable(
rowType,
Collections.emptyList(),
Collections.singletonList("pk"),
Collections.emptyList(),
options);

Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", DATABASE_NAME);
mySqlConfig.put("table-name", "assert_schema_compatible");

MySqlSyncTableAction action =
syncTableActionBuilder(mySqlConfig)
.withPrimaryKeys("pk")
.withTableConfig(getBasicTableConfig())
.build();

assertThatThrownBy(action::run)
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
"Paimon schema and source table schema are not compatible.\n"
+ "Paimon fields are: [`pk` INT NOT NULL, `a` BIGINT, `b` VARCHAR(20)].\n"
+ "Source table fields are: [`pk` INT NOT NULL '', `a` BIGINT '', `b` VARCHAR(30) '', `c` INT NOT NULL 'Add column cannot specify NOT NULL in the Paimon table']"));
}

public void testWriteOnlyAndSchemaEvolution() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", "write_only_and_schema_evolution");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
Expand All @@ -38,6 +39,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -77,7 +80,7 @@ public void testSchemaEvolution() throws Exception {
checkTableSchema(
"[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT NULL\"},"
+ "{\"id\":1,\"name\":\"_id\",\"type\":\"INT NOT NULL\"},"
+ "{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\"}]");
+ "{\"id\":2,\"name\":\"v1\",\"type\":\"STRING\"}]");

try (Statement statement = getStatement(DATABASE_NAME)) {
testSchemaEvolutionImpl(statement);
Expand Down Expand Up @@ -245,9 +248,9 @@ public void testMultipleSchemaEvolutions() throws Exception {

checkTableSchema(
"[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT NULL\"},"
+ "{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(10)\"},"
+ "{\"id\":1,\"name\":\"v1\",\"type\":\"STRING\"},"
+ "{\"id\":2,\"name\":\"v2\",\"type\":\"INT\"},"
+ "{\"id\":3,\"name\":\"v3\",\"type\":\"VARCHAR(10)\"}]");
+ "{\"id\":3,\"name\":\"v3\",\"type\":\"STRING\"}]");

try (Statement statement = getStatement(DATABASE_NAME)) {
testSchemaEvolutionMultipleImpl(statement);
Expand Down Expand Up @@ -786,6 +789,50 @@ public void testCatalogAndTableConfig() {
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
}

@Test
@Timeout(60)
public void testColumnAlterInExistingTableWhenStartJob() throws Exception {
String tableName = "test_exist_column_alter";
Map<String, String> options = new HashMap<>();
options.put("bucket", "1");
options.put("sink.parallelism", "1");

RowType rowType =
RowType.builder()
.field("pk", DataTypes.INT().notNull())
.field("a", DataTypes.BIGINT())
.field("b", DataTypes.STRING())
.build();

createFileStoreTable(
rowType,
Collections.emptyList(),
Collections.singletonList("pk"),
Collections.emptyList(),
options);

Map<String, String> postgresConfig = getBasicPostgresConfig();
postgresConfig.put(PostgresSourceOptions.DATABASE_NAME.key(), DATABASE_NAME);
postgresConfig.put(PostgresSourceOptions.SCHEMA_NAME.key(), SCHEMA_NAME);
postgresConfig.put(PostgresSourceOptions.TABLE_NAME.key(), tableName);

PostgresSyncTableAction action =
syncTableActionBuilder(postgresConfig).withPrimaryKeys("pk").build();

runActionWithDefaultEnv(action);

FileStoreTable table = getFileStoreTable();

Map<String, DataField> actual =
table.schema().fields().stream()
.collect(Collectors.toMap(DataField::name, Function.identity()));

assertThat(actual.get("pk").type()).isEqualTo(DataTypes.INT().notNull());
assertThat(actual.get("a").type()).isEqualTo(DataTypes.BIGINT());
assertThat(actual.get("b").type()).isEqualTo(DataTypes.STRING());
assertThat(actual.get("c").type()).isEqualTo(DataTypes.INT());
}

private FileStoreTable getFileStoreTable() throws Exception {
return getFileStoreTable(tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,12 +331,28 @@ CREATE TABLE test_exist_options_change (
);

CREATE TABLE test_exist_column_comment_change (
pk INT,
c1 DATE,
c2 VARCHAR(10) not null comment 'c2 comment',
pk INT comment 'pk new_comment',
c1 DATE comment 'c1 new_comment',
c2 VARCHAR(10) NOT NULL comment 'c2 comment',
PRIMARY KEY (pk)
);

CREATE TABLE test_exist_column_alter (
pk INT,
a BIGINT,
b VARCHAR(30),
c INT,
PRIMARY KEY (pk)
);

CREATE TABLE assert_schema_compatible (
pk INT,
a BIGINT,
b VARCHAR(30),
c INT NOT NULL comment 'Add column cannot specify NOT NULL in the Paimon table',
PRIMARY KEY (pk)
);

-- ################################################################################
-- testSyncShard
-- ################################################################################
Expand Down
Loading
Loading