Skip to content

Commit

Permalink
[hotfix][cdc-connector][mysql] Expose row_kind metadata column for my…
Browse files Browse the repository at this point in the history
…sql cdc source.

We use 'row_kind' name instead of 'op' or 'op_type', the operation type is a concept of database and database only has 'update'
type and hasn't 'update_before' and 'update_after' type.
  • Loading branch information
leonardBang committed Dec 26, 2023
1 parent 8eba910 commit 6cb951c
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 103 deletions.
86 changes: 46 additions & 40 deletions docs/content/connectors/mysql-cdc(ZH).md
Original file line number Diff line number Diff line change
Expand Up @@ -370,61 +370,67 @@ Flink SQL> SELECT * FROM orders;
<td>当前记录表在数据库中更新的时间。 <br>如果从表的快照而不是 binlog 读取记录,该值将始终为0。</td>
</tr>
<tr>
<td>op</td>
<td>row_kind</td>
<td>STRING NOT NULL</td>
<td>当前记录对应的操作类型。 <br>'+I' 表示 INSERT 数据,'-D' 表示 DELETE 数据,'-U' 表示 UPDATE_BEFORE 数据,'+U' 表示 UPDATE_AFTER 数据。</td>
<td>当前记录对应的 changelog 类型。注意:当 Source 算子选择为每条记录输出 row_kind 字段后,下游 SQL 算子在处理消息撤回时会因为这个字段不同而比对失败,
建议只在简单的同步作业中引用该元数据列。<br>'+I' 表示 INSERT 数据,'-D' 表示 DELETE 数据,'-U' 表示 UPDATE_BEFORE 数据,'+U' 表示 UPDATE_AFTER 数据。
</td>
</tr>
</tbody>
</table>

下述创建表示例展示元数据列的用法:

```sql
CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
operation STRING METADATA FROM 'op' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
CREATE TABLE products
(
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
operation STRING METADATA FROM 'row_kind' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders'
);
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders'
);
```

下述创建表示例展示使用正则表达式匹配多张库表的用法:

```sql
CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
operation STRING METADATA FROM 'op' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
CREATE TABLE products
(
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
operation STRING METADATA FROM 'row_kind' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})',
'table-name' = '(t[5-8]|tt)'
);
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})',
'table-name' = '(t[5-8]|tt)'
);
```
<table class="colwidths-auto docutils">
<thead>
Expand Down
86 changes: 46 additions & 40 deletions docs/content/connectors/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -378,61 +378,67 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a
<td>It indicates the time that the change was made in the database. <br>If the record is read from snapshot of the table instead of the binlog, the value is always 0.</td>
</tr>
<tr>
<td>op</td>
<td>row_kind</td>
<td>STRING NOT NULL</td>
<td>It indicates the operation type of the row. <br>'+I' means INSERT message, '-D' means DELETE message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message.</td>
<td>It indicates the row kind of the changelog,Note: The downstream SQL operator may fail to compare due to this new added column when processing the row retraction if
the source operator chooses to output the 'row_kind' column for each record. It is recommended to use this metadata column only in simple synchronization jobs.
<br>'+I' means INSERT message, '-D' means DELETE message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message.</td>
</tr>
</tbody>
</table>

The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

```sql
CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
operation STRING METADATA FROM 'op' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
CREATE TABLE products
(
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
operation STRING METADATA FROM 'row_kind' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders'
);
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders'
);
```

The extended CREATE TABLE example demonstrates the usage of regex to match multi-tables:

```sql
CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
operation STRING METADATA FROM 'op' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
CREATE TABLE products
(
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
operation STRING METADATA FROM 'row_kind' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})',
'table-name' = '(t[5-8]|tt)'
);
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})',
'table-name' = '(t[5-8]|tt)'
);
```
<table class="colwidths-auto docutils">
<thead>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,8 @@ public void collect(RowData physicalRow) {
for (int i = 0; i < metadataConverters.length; i++) {
MetadataConverter metadataConverter = metadataConverters[i];
Object meta;
if (metadataConverter instanceof MetadataWithRowDataConverter) {
meta =
((MetadataWithRowDataConverter) metadataConverter)
.read(inputRecord, physicalRow);
if (metadataConverter instanceof RowDataMetadataConverter) {
meta = ((RowDataMetadataConverter) metadataConverter).read(physicalRow);
} else {
meta = metadataConverter.read(inputRecord);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@
@FunctionalInterface
@Internal
public interface MetadataConverter extends Serializable {

Object read(SourceRecord record);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,11 @@

import org.apache.flink.table.data.RowData;

import org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.common.annotation.Internal;

/**
* A converter converts {@link SourceRecord} metadata and {@link RowData} into Flink internal data
* structures.
*/
public interface MetadataWithRowDataConverter extends MetadataConverter {
Object read(SourceRecord record, RowData rowData);
/** A converter converts {@link RowData} metadata into Flink internal data structures. */
@Internal
public interface RowDataMetadataConverter extends MetadataConverter {

default Object read(SourceRecord record) {
throw new UnsupportedOperationException(
"This method should never be called, please call the read(SourceRecord, RowData) method instead.");
}
Object read(RowData rowData);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.table.types.DataType;

import com.ververica.cdc.debezium.table.MetadataConverter;
import com.ververica.cdc.debezium.table.MetadataWithRowDataConverter;
import com.ververica.cdc.debezium.table.RowDataMetadataConverter;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import org.apache.kafka.connect.data.Struct;
Expand Down Expand Up @@ -83,19 +83,25 @@ public Object read(SourceRecord record) {
}),

/**
* It indicates the operation type of the row. '+I' means INSERT message, '-D' means DELETE
* It indicates the row kind of the changelog. '+I' means INSERT message, '-D' means DELETE
* message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message
*/
OP_TYPE(
"op",
ROW_KIND(
"row_kind",
DataTypes.STRING().notNull(),
new MetadataWithRowDataConverter() {
new RowDataMetadataConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object read(SourceRecord record, RowData rowData) {
public Object read(RowData rowData) {
return StringData.fromString(rowData.getRowKind().shortString());
}

@Override
public Object read(SourceRecord record) {
throw new UnsupportedOperationException(
"Please call read(RowData rowData) method instead.");
}
});

private final String key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ public void testMetadataColumns() throws Exception {
"CREATE TABLE mysql_users ("
+ " db_name STRING METADATA FROM 'database_name' VIRTUAL,"
+ " table_name STRING METADATA VIRTUAL,"
+ " op STRING METADATA FROM 'op' VIRTUAL,"
+ " row_kind STRING METADATA FROM 'row_kind' VIRTUAL,"
+ " `id` DECIMAL(20, 0) NOT NULL,"
+ " name STRING,"
+ " address STRING,"
Expand Down Expand Up @@ -968,7 +968,7 @@ public void testMetadataColumns() throws Exception {
"CREATE TABLE sink ("
+ " database_name STRING,"
+ " table_name STRING,"
+ " op STRING,"
+ " row_kind STRING,"
+ " `id` DECIMAL(20, 0) NOT NULL,"
+ " name STRING,"
+ " address STRING,"
Expand Down

0 comments on commit 6cb951c

Please sign in to comment.